RxJava五种被观察者为Flowable, Observable,Single, Completable, Maybe。
五种被观察者可通过toFlowable, toObservable,toSingle, toCompletable, toMaybe相互转换。
1、Flowable
1.1、Flowable简介
Flowable类,用于实现Reactive-Streams模式,并提供工厂方法,中间运算符以及使用反应式数据流的能力。
Reactive-Streams使用Flowable运行,Flowable实现了Publishers。因此,许多运算符直接接受Publishers,并允许与其他Reactive-Streams的实现进行直接交互操作
public abstract class Flowable<T> implements Publisher<T>
Flowable为操作符提供128个元素的默认缓冲区大小,可通过bufferSize() 方法获取,可通过系统参数rx2.buffer-size全局覆盖。但是大多数运算符都有重载,允许显式设置其内部缓冲区大小。
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128));
}
/**
* Returns the default internal buffer size used by most async operators.
* <p>The value can be overridden via system parameter {@code rx2.buffer-size}
* <em>before</em> the Flowable class is loaded.
* @return the default internal buffer size.
*/
public static int bufferSize() {
return BUFFER_SIZE;
}
1.2、Flowable官方图解
1)看到上图有点疑问,不是在说Flowable嘛,怎么图解里的说明是Observable呢?
2)其实在官方文档里面Flowable和Observable都使用的是上面这个图解,因此这两个类肯定是提供相似功能,既然是相似,那么这幅图就是他们的共性,那不同的地方是什么呢?
不同之处是:Flowable支持Backpressure,Observable不支持Backpressure;只有在需要处理背压问题时,才需要使用Flowable。
由于只有在上下游运行在不同的线程中,且上游发射数据的速度大于下游接收处理数据的速度时,才会产生背压问题;
所以,如果能够确定:
1、上下游运行在同一个线程中,
2、上下游工作在不同的线程中,但是下游处理数据的速度不慢于上游发射数据的速度,
3、上下游工作在不同的线程中,但是数据流中只有一条数据
则不会产生背压问题,就没有必要使用Flowable,以免影响性能。
类似于Observable,在使用Flowable时,也可以通过create操作符创建发射数据流,代码如下:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}, BackpressureStrategy.BUFFER) //create方法中多了一个BackpressureStrategy类型的参数
.subscribeOn(Schedulers.newThread())//为上下游分别指定各自的线程
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) { //onSubscribe回调的参数不是Disposable而是Subscription
s.request(Long.MAX_VALUE); //注意此处,暂时先这么设置
}
@Override
public void onNext(Integer integer) {
System.out.println("接收----> " + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
System.out.println("接收----> 完成");
}
});
运行结果如下:
System.out: 接收----> 1
System.out: 接收----> 2
System.out: 接收----> 3
System.out: 接收----> 完成
发射与处理数据流在形式上与Observable大同小异,发射器中均有onNext,onError,onComplete方法,订阅器中也均有onSubscribe,onNext,onError,onComplete方法。
但是在细节方面还是有三点不同:
一、create方法中多了一个BackpressureStrategy类型的参数。
二、订阅器Subscriber中,方法onSubscribe回调的参数不是Disposable而是Subscription,多了行代码:s.request(Long.MAX_VALUE);
三、Flowable发射数据时,使用特有的发射器FlowableEmitter,不同于Observable的ObservableEmitter
1.3、Backpressure
在通过create操作符创建Flowable时,多了一个BackpressureStrategy类型的参数,BackpressureStrategy是个枚举,源码如下:
/**
* Represents the options for applying backpressure to a source sequence.
*/
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
当上游发送数据的速度快于下游接收数据的速度,且运行在不同的线程中时,Flowable通过自身特有的异步缓存池,来缓存没来得及处理的数据,缓存池的容量上限为128
BackpressureStrategy的作用便是用来设置Flowable通过异步缓存池缓存数据的策略。在源码FlowableCreate类中,可以看到五个泛型分别对应五个java类,通过代理模式对原始的发射器进行了包装:
Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
ERROR
对应于ErrorAsyncEmitter
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 338953216916120960L;
ErrorAsyncEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
}
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;
NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() != 0) {
actual.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
abstract void onOverflow();
}
onOverflow方法中可以看到,在此策略下,如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常。父类的onNext中,在判断get() != 0,即缓存池未满的情况下,才会让被代理类调用onNext方法。运行如下代码:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for (int i = 1; i <= 129; i++) {
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE); //注意此处,暂时先这么设置
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(10000);
} catch (InterruptedException ignore) {
}
System.out.println(integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("接收----> 完成");
}
});
创建并通过Flowable发射129条数据,Subscriber的onNext方法睡10秒之后再开始接收,运行后会发现控制台打印如下异常:
W/System.err: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
W/System.err: at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
W/System.err: at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
如果将Flowable发射数据的条数改为128,则不会出现此异常。
DROP
对应于DropAsyncEmitter
static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 8360058422307496563L;
DropAsyncEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
void onOverflow() {
// nothing to do
}
}
可以看到,DropAsyncEmitter的onOverflow是个空方法,没有执行任何操作。所以在此策略下,如果Flowable的异步缓存池满了,会丢掉上游发送的数据。
存池中数据的清理,并不是Subscriber接收一条,便清理一条,而是存在一个延迟,等累积一段时间后统一清理一次。也就是Subscriber接收到第96条数据时,缓存池才开始清理数据,之后Flowable发射的数据才得以放入。如果数据处于缓存池存满的状态时,则被丢弃。
LATEST
对应于LatestAsyncEmitter
与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中,来保证观察者在接收到完成通知之前,能够接收到Flowable最新发射的一条数据。
BUFFER
Flowable处理背压的默认策略,对应于BufferAsyncEmitter
其部分源码为:
static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 2427151001689639875L;
final SpscLinkedArrayQueue<T> queue;
. . . . . .
final AtomicInteger wip;
BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
super(actual);
this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
this.wip = new AtomicInteger();
}
. . . . . .
}
在其构造方法中可以发现,其内部维护了一个缓存池SpscLinkedArrayQueue,其大小不限,此策略下,如果Flowable默认的异步缓存池满了,会通过此缓存池暂存数据,它与Observable的异步缓存池一样,可以无限制向里添加数据,不会抛出MissingBackpressureException异常,但会导致OOM。
和使用Observable时一样,都会导致内存剧增,最后导致OOM,不同的是使用Flowable内存增长的速度要慢得多,那是因为基于Flowable发射的数据流,以及对数据加工处理的各操作符都添加了背压支持,附加了额外的逻辑,其运行效率要比Observable低得多。
MISSING
对应于MissingEmitter
static final class MissingEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 3776720187248809713L;
MissingEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
public void onNext(T t) {
if (isCancelled()) {
return;
}
if (t != null) {
actual.onNext(t);
} else {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (;;) {
long r = get();
if (r == 0L || compareAndSet(r, r - 1)) {
return;
}
}
}
}
可以发现,在传递数据时
actual.onNext(t);
并没有对缓存池的状态进行判断,所以在此策略下,通过Create方法创建的Flowable相当于没有指定背压策略,不会对通过onNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略。
onBackpressureXXX背压操作符:Flowable除了通过create创建的时候指定背压策略,也可以在通过其它创建操作符just,fromArray等创建后通过背压操作符指定背压策略。
onBackpressureBuffer()对应BackpressureStrategy.BUFFER
onBackpressureDrop()对应BackpressureStrategy.DROP
onBackpressureLatest()对应BackpressureStrategy.LATEST
例如代码
Flowable.range(0, 500)
.onBackpressureDrop()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
System.out.println(integer);
}
});
Subscription
Subscription与Disposable均是观察者与可观察对象建立订阅状态后回调回来的参数,如同通过Disposable的dispose()方法可以取消Observer与Oberverable的订阅关系一样,通过Subscription的cancel()方法也可以取消Subscriber与Flowable的订阅关系。
不同的是接口Subscription中多了一个方法request(long n),如上面代码中的:
s.request(Long.MAX_VALUE);
Flowable在设计的时候,采用了一种新的思路——响应式拉取方式,来设置下游对数据的请求数量,上游可以根据下游的需求量,按需发送数据。如果不显示调用request则默认下游的需求量为零,上游Flowable发射的数据不会交给下游Subscriber处理。
多次调用request,数字会累积。
上游并没有根据下游的实际需求,发送数据,而是能发送多少,就发送多少,不管下游是否需要。而且超出下游需求之外的数据,仍然放到了异步缓存池中。这点我们可以通过以下代码来验证:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 130; i++) {
System.out.println("发射---->" + i);
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onNext(Integer integer) {
System.out.println("接收------>" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("接收------>完成");
}
});
通过Flowable发射130条数据,通过s.request(1)设置下游的数据请求量为1条,设置缓存策略为BackpressureStrategy.ERROR,如果异步缓存池超限,会导致MissingBackpressureException异常。久违的异常出现了,所以超出下游需求之外的数据,仍然放到了异步缓存池中,并导致缓存池溢出。
那么上游如何才能按照下游的请求数量发送数据呢,虽然通过request可以设置下游的请求数量,但是上游并没有获取到这个数量,如何获取呢?这便需要用到Flowable与Observable的第三点区别,Flowable特有的发射器FlowableEmitter
FlowableEmitter
flowable的发射器FlowableEmitter与observable的发射器ObservableEmitter均继承自Emitter
比较两者源码可以发现:
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable d);
void setCancellable(Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
}
与
public interface FlowableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable s);
void setCancellable(Cancellable c);
long requested();
boolean isCancelled();
FlowableEmitter<T> serialize();
}
接口FlowableEmitter中多了一个方法
long requested();
上游在发送数据的时候并不需要考虑下游需不需要,而只需要考虑异步缓存池中是否放得下,放得下便发,放不下便暂停。所以,通过e.requested()获取到的值,并不是下游真正的数据请求数量,而是异步缓存池中可放入数据的数量。数据放入缓存池中后,再由缓存池按照下游的数据请求量向下传递,待到传递完的数据累积到95条之后,将其清除,腾出空间存放新的数据。如果下游处理数据缓慢,则缓存池向下游传递数据的速度也相应变慢,进而没有传递完的数据可清除,也就没有足够的空间存放新的数据,上游通过e.requested()获取的值也就变成了0,如果此时,再发送数据的话,则会根据BackpressureStrategy背压策略的不同,抛出MissingBackpressureException异常,或者丢掉这条数据。
我们可以通过这个方法来获取当前未完成的请求数量,上游只需要在e.requested()等于0时,暂停发射数据,便可解决背压问题。
最终方案
下面,对其通过Flowable做些改进,让其既不会产生背压问题,也不会引起异常或者数据丢失。
代码如下:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
int i = 0;
while (true) {
if (e.requested() == 0) continue;//此处添加代码,让flowable按需发送数据
System.out.println("发射---->" + i);
i++;
e.onNext(i);
}
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
private Subscription mSubscription;
@Override
public void onSubscribe(Subscription s) {
s.request(1); //设置初始请求数据量为1
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(50);
System.out.println("接收------>" + integer);
mSubscription.request(1);//每接收到一条数据增加一条请求量
} catch (InterruptedException ignore) {
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
下游处理数据的速度Thread.sleep(50)赶不上上游发射数据的速度,不同的是,我们在下游onNext(Integer integer) 方法中,每接收一条数据增加一条请求量,
mSubscription.request(1)
在上游添加代码
if(e.requested()==0)continue;
让上游按需发送数据,上游严格按照下游的需求量发送数据,不会产生MissingBackpressureException异常,或者丢失数据。
2、Observable
2.1、Observable简介
Observable类是不支持背压的,Observable是Reactive的一个抽象基类,Observable提供工厂方法,中间运算符以及消费同步和/或异步数据流的功能。
Observable类中的多数运算符接受一个或者多个ObservableSource,ObservableSource是非背压的基本接口,Observable实现了这个接口。
public abstract class Observable
默认情况下,Observable的为其运算符提供128个元素的缓冲区大小运行,可看考Flowable.bufferSize(),可以通过系统参数rx2.buffer-size全局覆盖。但是,大多数运算符都有重载,允许设置其内部缓冲区大小。
2.2、Flowable和Observable对比
在上面已经说明了二者最大的区别。
官方也给出的解释是:
The design of this class was derived from the Reactive-Streams design and specification by removing any backpressure-related infrastructure and implementation detail, replacing the org.reactivestreams.Subscription with Disposable as the primary means to dispose of a flow.
中文翻译:
该类的设计源自Reactive-Streams设计和规范,通过删除任何与背压相关的基本结构和实现细节,将Disposable替换为org.reactivestreams.Subscription作为处理流的主要方式。
从代码层面上做简单的说明,Flowable实现了Publisher接口,Publisher源码如下:
public interface Publisher<T> {
/**
* Request {@link Publisher} to start streaming data.
* <p>
* This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
* <p>
* Each {@link Subscription} will work for only a single {@link Subscriber}.
* <p>
* A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
* <p>
* If the {@link Publisher} rejects the subscription attempt or otherwise fails it will
* signal the error via {@link Subscriber#onError}.
*
* @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
*/
public void subscribe(Subscriber<? super T> s);
}
Observable实现了ObservableSource接口,ObservableSource源码如下1
2
3
4
5
6
7
8
9public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(Observer<? super T> observer);
}
对比ObservableSource和Publisher,都有一个同名的接口subscribe()
2.4、形象理解ObservableSource和Publisher有何异同
ObservableSource:可观察源
Publisher:发布者
subscribe:订阅
Subscriber:订阅者
Observer:观察者
对于ObservableSource,可以将subscribe(Observer observer)理解为Observer通过subscribe订阅了ObservableSource
对于Publisher,可以将subscribe(Subscriber s)理解为Subscriber通过subscribe订阅了Publisher
上面的解释可能比较抽象,通俗的举个例子,来个角色扮演
第一组:报刊(ObservableSource)、报刊订阅者(Observer)、订阅报刊的行为(subscribe)
第二组:报刊发布人(Publisher)、报刊订阅者(Subscriber)、订阅报刊的行为(subscribe)
把这个场景串起来讲就是:报刊订阅者订阅了报刊,或者说报刊订阅者在报刊发布人手中订阅了报刊。
这其实是典型的观察者模式,所不同的是信息的发布者是ObservableSource还是Publisher,信息的订阅者是Observer还是Subscriber,统一的行为都是subscribe。
3、Single
3.1、Single简介
public abstract class Single<T> implements SingleSource<T>
Single实现了SingleSource
/**
* Represents a basic {@link Single} source base interface,
* consumable via an {@link SingleObserver}.
* <p>
* This class also serves the base type for custom operators wrapped into
* Single via {@link Single#create(SingleOnSubscribe)}.
*
* @param <T> the element type
* @since 2.0
*/
public interface SingleSource<T> {
/**
* Subscribes the given SingleObserver to this SingleSource instance.
* @param observer the SingleObserver, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(SingleObserver<? super T> observer);
}
Single类为单个值响应实现Reactive Pattern。
Single和Observable类似,所不同的是Single只能发出一个值,要么发射成功要么发射失败,也没有“onComplete”作为完成时的回调
Single类实现了基类SingleSource的接口,SingleObserver作为Single发出来的消息的默认消费者,SingleObserver通过subscribe(SingleObserver<? super T> observer)在Single中订阅消息
public interface SingleObserver<T> {
/**
* Provides the SingleObserver with the means of cancelling (disposing) the
* connection (channel) with the Single in both
* synchronous (from within {@code onSubscribe(Disposable)} itself) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(Disposable d);
/**
* Notifies the SingleObserver with a single item and that the {@link Single} has finished sending
* push-based notifications.
* <p>
* The {@link Single} will not call this method if it calls {@link #onError}.
*
* @param value
* the item emitted by the Single
*/
void onSuccess(T value);
/**
* Notifies the SingleObserver that the {@link Single} has experienced an error condition.
* <p>
* If the {@link Single} calls this method, it will not thereafter call {@link #onSuccess}.
*
* @param e
* the exception encountered by the Single
*/
void onError(Throwable e);
}
3.2、Single与Observable的区别
Single只能发送单个消息,不能发送消息流,而且观察者接收到消息也只有两种情况,要么接收成功,要么接收失败
3.3、Single官方图解
4、Completable
4.1、Completable简介
Completable类表示延迟计算,没有任何值,只表示完成或异常。
Completable的行为类似于Observable,在计算完成后只能发出完成或错误信号,由onComplete或onError接口来处理,没有onNext或onSuccess等回调接口
Completable实现了基类CompletableSource的接口,CompletableObserver通过subscribe()方法在Completable处订阅消息。
Completable遵循协议:onSubscribe (onComplete | onError)
public abstract class Completable implements CompletableSource
public interface CompletableSource {
/**
* Subscribes the given CompletableObserver to this CompletableSource instance.
* @param cs the CompletableObserver, not null
* @throws NullPointerException if {@code cs} is null
*/
void subscribe(CompletableObserver cs);
}
public interface CompletableObserver {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable e);
}
从源码中我们可以看到CompletableObserver里面有三个接口:
1)onSubscribe中传入参数Disposable,由Completable调用一次,在CompletableObserver实例上设置Disposable,然后可以随时取消订阅。
2)onComplete一旦延迟计算正常完成将会被调用
3)onError 一旦延迟计算抛出异常将会被调用
4.3、Completable示例
注意: 通过Disposable调用dispose()取消订阅,后面的消息无法接收。
运行下面的例子,可以看到在调用dispose后,onStart被回调后,后续的消息就收不到了;去掉dispose,onStart回调后,三秒后onComplete将会被回调
private void doCompletable() {
Disposable d = Completable.complete()
.delay(3, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onStart() {
System.out.println("Started");
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete!");
}
});
d.dispose();
}
运行完毕的结果是:
10-02 11:10:34.797 15565-15565/hq.demo.net I/System.out: Started
注释d.dispose()后在运行结果是:
10-02 14:34:19.490 23232-23232/hq.demo.net I/System.out: Started
10-02 14:34:22.492 23232-23483/hq.demo.net I/System.out: Done!
上面使用的是DisposableCompletableObserver通过subscribeWith来订阅消息,返回一个Disposable可以通过dispose来取消订阅关系,DisposableCompletableObserver是CompletableObserve的子类,只是增加了可取消订阅的功能。当然也能通过CompletableObserve方法操作,但是无法取消订阅关系,除此外没什么本质区别。
5、Maybe
5.1、Maybe简介
Maybe类表示延迟计算和单个值的发射,这个值可能根本没有或异常。
Maybe类实现MaybeSource的接口,MaybeObserver通过subscribe(MaybeObserver)在Maybe处订阅消息
Maybe遵循协议:onSubscribe (onSuccess | onError | onComplete),也就是Maybe发射消息后,可能会回调的接口是onSuccess | onError | onComplete
public abstract class Maybe<T> implements MaybeSource<T>
public interface MaybeSource<T> {
/**
* Subscribes the given MaybeObserver to this MaybeSource instance.
* @param observer the MaybeObserver, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(MaybeObserver<? super T> observer);
}
public interface MaybeObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable e);
void onComplete();
}
5.3、Maybe示例
下面是个例子,注意让线程睡多少秒可以修改测试dispose,与Completable类似,但是无论怎么onStart()都会被回调,为什么onStart()都会被回调呢?可以看DisposableMaybeObserver源码,在订阅消息的时候就会首先回调onSubscribe,这个时候dispose还没有运行了,这个动作发生在订阅的时候,没有订阅何来取消订阅呢。
public abstract class DisposableMaybeObserver<T> implements MaybeObserver<T>, Disposable {
final AtomicReference<Disposable> s = new AtomicReference<Disposable>();
@Override
public final void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
onStart();
}
}
/**
* Called once the single upstream Disposable is set via onSubscribe.
*/
protected void onStart() {
}
@Override
public final boolean isDisposed() {
return s.get() == DisposableHelper.DISPOSED;
}
@Override
public final void dispose() {
DisposableHelper.dispose(s);
}
}
下面是实例的运行和结果
private void doMaybe() {
new Thread(new Runnable() {
@Override
public void run() {
Disposable d = Maybe.just("Hello World")
.delay(3, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onStart() {
System.out.println("Started");
}
@Override
public void onSuccess(String value) {
System.out.println("Success: " + value);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
d.dispose();
}
}).start();
}
运行结果是:
10-02 15:01:53.320 25573-25649/hq.demo.net I/System.out: Started
10-02 15:01:56.324 25573-25654/hq.demo.net I/System.out: Success: Hello World
如果把Thread.sleep(4000)修改为Thread.sleep(2000)运行结果是:
10-02 15:05:34.362 25840-25872/hq.demo.net I/System.out: Started
上面例子使用DisposableMaybeObserver通过subscribeWith在Maybe处订阅,并返回一个Disposable,可以通过Disposable调用dispose来取消订阅。当然我们也可以通过下面的方式来完成,但是无法取消订阅关系:
private void doMaybe() {
Maybe.just("Hello World")
.delay(3, TimeUnit.SECONDS, Schedulers.io())
.subscribe(new MaybeObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Started");
}
@Override
public void onSuccess(String value) {
System.out.println("Success: " + value);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
}
6、总结
Single、Completable、Maybe是简化的Observable,只是具有少部分功能:
Single:只能发射一条单一数据或者一条异常通知,不能发射完成通知,数据与通知只能发射一个,二选一
Completable:只能发射一条完成通知或者一条异常通知,不能发射数据,要么发射完成通知要么发射异常通知,二选一
Maybe:只能发射一条单一数据,和发射一条完成通知,或者一条异常通知,完成通知和异常通知二选一,只能在发射完成通知或异常通知之前发射数据,否则发射数据无效
7、参考资料
https://blog.csdn.net/weixin_36709064/article/details/82911270
Flowable背压支持