RxJava2五种被观察者及背压

RxJava五种被观察者为Flowable, Observable,Single, Completable, Maybe。

五种被观察者可通过toFlowable, toObservable,toSingle, toCompletable, toMaybe相互转换。

1、Flowable

1.1、Flowable简介

Flowable类,用于实现Reactive-Streams模式,并提供工厂方法,中间运算符以及使用反应式数据流的能力。

Reactive-Streams使用Flowable运行,Flowable实现了Publishers。因此,许多运算符直接接受Publishers,并允许与其他Reactive-Streams的实现进行直接交互操作

public abstract class Flowable<T> implements Publisher<T>

Flowable为操作符提供128个元素的默认缓冲区大小,可通过bufferSize() 方法获取,可通过系统参数rx2.buffer-size全局覆盖。但是大多数运算符都有重载,允许显式设置其内部缓冲区大小。

/** The default buffer size. */
static final int BUFFER_SIZE;
static {
    BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128));
}

/**
 * Returns the default internal buffer size used by most async operators.
 * <p>The value can be overridden via system parameter {@code rx2.buffer-size}
 * <em>before</em> the Flowable class is loaded.
 * @return the default internal buffer size.
 */
public static int bufferSize() {
    return BUFFER_SIZE;
}

1.2、Flowable官方图解


1)看到上图有点疑问,不是在说Flowable嘛,怎么图解里的说明是Observable呢?

2)其实在官方文档里面Flowable和Observable都使用的是上面这个图解,因此这两个类肯定是提供相似功能,既然是相似,那么这幅图就是他们的共性,那不同的地方是什么呢?

不同之处是:Flowable支持Backpressure,Observable不支持Backpressure;只有在需要处理背压问题时,才需要使用Flowable

由于只有在上下游运行在不同的线程中,且上游发射数据的速度大于下游接收处理数据的速度时,才会产生背压问题
所以,如果能够确定:
1、上下游运行在同一个线程中,
2、上下游工作在不同的线程中,但是下游处理数据的速度不慢于上游发射数据的速度,
3、上下游工作在不同的线程中,但是数据流中只有一条数据
则不会产生背压问题,就没有必要使用Flowable,以免影响性能。

类似于Observable,在使用Flowable时,也可以通过create操作符创建发射数据流,代码如下:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER) //create方法中多了一个BackpressureStrategy类型的参数
        .subscribeOn(Schedulers.newThread())//为上下游分别指定各自的线程
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {   //onSubscribe回调的参数不是Disposable而是Subscription
                s.request(Long.MAX_VALUE);            //注意此处,暂时先这么设置
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("接收----> " + integer);
            }

            @Override
            public void onError(Throwable t) {
            }

            @Override
            public void onComplete() {
                System.out.println("接收----> 完成");
            }
        });

运行结果如下:

System.out: 接收----> 1
System.out: 接收----> 2
System.out: 接收----> 3
System.out: 接收----> 完成

发射与处理数据流在形式上与Observable大同小异,发射器中均有onNext,onError,onComplete方法,订阅器中也均有onSubscribe,onNext,onError,onComplete方法。
但是在细节方面还是有三点不同:
一、create方法中多了一个BackpressureStrategy类型的参数。
二、订阅器Subscriber中,方法onSubscribe回调的参数不是Disposable而是Subscription,多了行代码:s.request(Long.MAX_VALUE);
三、Flowable发射数据时,使用特有的发射器FlowableEmitter,不同于Observable的ObservableEmitter

1.3、Backpressure

在通过create操作符创建Flowable时,多了一个BackpressureStrategy类型的参数,BackpressureStrategy是个枚举,源码如下:

/**
 * Represents the options for applying backpressure to a source sequence.
 */
public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

当上游发送数据的速度快于下游接收数据的速度,且运行在不同的线程中时,Flowable通过自身特有的异步缓存池,来缓存没来得及处理的数据,缓存池的容量上限为128

BackpressureStrategy的作用便是用来设置Flowable通过异步缓存池缓存数据的策略。在源码FlowableCreate类中,可以看到五个泛型分别对应五个java类,通过代理模式对原始的发射器进行了包装:

Override
public void subscribeActual(Subscriber<? super T> t) {
    BaseEmitter<T> emitter;

    switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
    }

    t.onSubscribe(emitter);
    try {
        source.subscribe(emitter);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        emitter.onError(ex);
    }
}

ERROR

对应于ErrorAsyncEmitter类,在其源码

static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
        private static final long serialVersionUID = 338953216916120960L;

        ErrorAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        void onOverflow() {
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }

    }

abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

        private static final long serialVersionUID = 4127754106204442833L;

        NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }

            if (get() != 0) {
                actual.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();
            }
        }

        abstract void onOverflow();
    }

onOverflow方法中可以看到,在此策略下,如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常。父类的onNext中,在判断get() != 0,即缓存池未满的情况下,才会让被代理类调用onNext方法。运行如下代码:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for (int i = 1; i <= 129; i++) {
                    e.onNext(i);
                }
                e.onComplete();
            }
        }, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE);            //注意此处,暂时先这么设置
            }

            @Override
            public void onNext(Integer integer) {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException ignore) {
                }
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("接收----> 完成");
            }
        });

创建并通过Flowable发射129条数据,Subscriber的onNext方法睡10秒之后再开始接收,运行后会发现控制台打印如下异常:

W/System.err: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
W/System.err:     at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
W/System.err:     at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)

如果将Flowable发射数据的条数改为128,则不会出现此异常。

DROP

对应于DropAsyncEmitter类,通过DropAsyncEmitter类和它父类NoOverflowBaseAsyncEmitter的源码

static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
    private static final long serialVersionUID = 8360058422307496563L;

    DropAsyncEmitter(Subscriber<? super T> actual) {
        super(actual);
    }

    @Override
    void onOverflow() {
        // nothing to do
    }
}

可以看到,DropAsyncEmitter的onOverflow是个空方法,没有执行任何操作。所以在此策略下,如果Flowable的异步缓存池满了,会丢掉上游发送的数据

存池中数据的清理,并不是Subscriber接收一条,便清理一条,而是存在一个延迟,等累积一段时间后统一清理一次。也就是Subscriber接收到第96条数据时,缓存池才开始清理数据,之后Flowable发射的数据才得以放入。如果数据处于缓存池存满的状态时,则被丢弃。

LATEST

对应于LatestAsyncEmitter
与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中,来保证观察者在接收到完成通知之前,能够接收到Flowable最新发射的一条数据。

BUFFER

Flowable处理背压的默认策略,对应于BufferAsyncEmitter

其部分源码为:

static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
        private static final long serialVersionUID = 2427151001689639875L;
        final SpscLinkedArrayQueue<T> queue;
        . . . . . .
        final AtomicInteger wip;
        BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
            super(actual);
            this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
            this.wip = new AtomicInteger();
        }
        . . . . . .
}

在其构造方法中可以发现,其内部维护了一个缓存池SpscLinkedArrayQueue,其大小不限,此策略下,如果Flowable默认的异步缓存池满了,会通过此缓存池暂存数据,它与Observable的异步缓存池一样,可以无限制向里添加数据,不会抛出MissingBackpressureException异常,但会导致OOM

和使用Observable时一样,都会导致内存剧增,最后导致OOM,不同的是使用Flowable内存增长的速度要慢得多,那是因为基于Flowable发射的数据流,以及对数据加工处理的各操作符都添加了背压支持,附加了额外的逻辑,其运行效率要比Observable低得多。

MISSING

对应于MissingEmitter类,通过其源码:

static final class MissingEmitter<T> extends BaseEmitter<T> {


        private static final long serialVersionUID = 3776720187248809713L;

        MissingEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        public void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t != null) {
                actual.onNext(t);
            } else {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }

            for (;;) {
                long r = get();
                if (r == 0L || compareAndSet(r, r - 1)) {
                    return;
                }
            }
        }

    }

可以发现,在传递数据时

actual.onNext(t);

并没有对缓存池的状态进行判断,所以在此策略下,通过Create方法创建的Flowable相当于没有指定背压策略,不会对通过onNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略。

onBackpressureXXX背压操作符:Flowable除了通过create创建的时候指定背压策略,也可以在通过其它创建操作符just,fromArray等创建后通过背压操作符指定背压策略

onBackpressureBuffer()对应BackpressureStrategy.BUFFER
onBackpressureDrop()对应BackpressureStrategy.DROP
onBackpressureLatest()对应BackpressureStrategy.LATEST

例如代码

Flowable.range(0, 500)
        .onBackpressureDrop()
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

Subscription

Subscription与Disposable均是观察者与可观察对象建立订阅状态后回调回来的参数,如同通过Disposable的dispose()方法可以取消Observer与Oberverable的订阅关系一样,通过Subscription的cancel()方法也可以取消Subscriber与Flowable的订阅关系
不同的是接口Subscription中多了一个方法request(long n),如上面代码中的:

s.request(Long.MAX_VALUE);   

Flowable在设计的时候,采用了一种新的思路——响应式拉取方式,来设置下游对数据的请求数量,上游可以根据下游的需求量,按需发送数据。如果不显示调用request则默认下游的需求量为零,上游Flowable发射的数据不会交给下游Subscriber处理。

多次调用request,数字会累积。

上游并没有根据下游的实际需求,发送数据,而是能发送多少,就发送多少,不管下游是否需要。而且超出下游需求之外的数据,仍然放到了异步缓存池中。这点我们可以通过以下代码来验证:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for (int i = 1; i < 130; i++) {
                    System.out.println("发射---->" + i);
                    e.onNext(i);
                }
                e.onComplete();
            }
        }, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(1);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("接收------>" + integer);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("接收------>完成");
            }
        });

通过Flowable发射130条数据,通过s.request(1)设置下游的数据请求量为1条,设置缓存策略为BackpressureStrategy.ERROR,如果异步缓存池超限,会导致MissingBackpressureException异常。久违的异常出现了,所以超出下游需求之外的数据,仍然放到了异步缓存池中,并导致缓存池溢出。

那么上游如何才能按照下游的请求数量发送数据呢,虽然通过request可以设置下游的请求数量,但是上游并没有获取到这个数量,如何获取呢?这便需要用到Flowable与Observable的第三点区别,Flowable特有的发射器FlowableEmitter

FlowableEmitter

flowable的发射器FlowableEmitter与observable的发射器ObservableEmitter均继承自Emitter
比较两者源码可以发现:

public interface ObservableEmitter<T> extends Emitter<T> {

    void setDisposable(Disposable d);

    void setCancellable(Cancellable c);

    boolean isDisposed();

    ObservableEmitter<T> serialize();
}

public interface FlowableEmitter<T> extends Emitter<T> {

    void setDisposable(Disposable s);

    void setCancellable(Cancellable c);

    long requested();

    boolean isCancelled();

    FlowableEmitter<T> serialize();
}

接口FlowableEmitter中多了一个方法

long requested();

上游在发送数据的时候并不需要考虑下游需不需要,而只需要考虑异步缓存池中是否放得下,放得下便发,放不下便暂停。所以,通过e.requested()获取到的值,并不是下游真正的数据请求数量,而是异步缓存池中可放入数据的数量。数据放入缓存池中后,再由缓存池按照下游的数据请求量向下传递,待到传递完的数据累积到95条之后,将其清除,腾出空间存放新的数据。如果下游处理数据缓慢,则缓存池向下游传递数据的速度也相应变慢,进而没有传递完的数据可清除,也就没有足够的空间存放新的数据,上游通过e.requested()获取的值也就变成了0,如果此时,再发送数据的话,则会根据BackpressureStrategy背压策略的不同,抛出MissingBackpressureException异常,或者丢掉这条数据。

我们可以通过这个方法来获取当前未完成的请求数量,上游只需要在e.requested()等于0时,暂停发射数据,便可解决背压问题。

最终方案

下面,对其通过Flowable做些改进,让其既不会产生背压问题,也不会引起异常或者数据丢失。
代码如下:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                int i = 0;
                while (true) {
                    if (e.requested() == 0) continue;//此处添加代码,让flowable按需发送数据
                    System.out.println("发射---->" + i);
                    i++;
                    e.onNext(i);
                }
            }
        }, BackpressureStrategy.MISSING)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            private Subscription mSubscription;

            @Override
            public void onSubscribe(Subscription s) {
                s.request(1);            //设置初始请求数据量为1
                mSubscription = s;
            }

            @Override
            public void onNext(Integer integer) {
                try {
                    Thread.sleep(50);
                    System.out.println("接收------>" + integer);
                    mSubscription.request(1);//每接收到一条数据增加一条请求量
                } catch (InterruptedException ignore) {
                }
            }

            @Override
            public void onError(Throwable t) {
            }

            @Override
            public void onComplete() {
            }
        });

下游处理数据的速度Thread.sleep(50)赶不上上游发射数据的速度,不同的是,我们在下游onNext(Integer integer) 方法中,每接收一条数据增加一条请求量,

mSubscription.request(1)

在上游添加代码

if(e.requested()==0)continue;

让上游按需发送数据,上游严格按照下游的需求量发送数据,不会产生MissingBackpressureException异常,或者丢失数据。

2、Observable

2.1、Observable简介

Observable类是不支持背压的,Observable是Reactive的一个抽象基类,Observable提供工厂方法,中间运算符以及消费同步和/或异步数据流的功能。

Observable类中的多数运算符接受一个或者多个ObservableSource,ObservableSource是非背压的基本接口,Observable实现了这个接口。

public abstract class Observable implements ObservableSource
默认情况下,Observable的为其运算符提供128个元素的缓冲区大小运行,可看考Flowable.bufferSize(),可以通过系统参数rx2.buffer-size全局覆盖。但是,大多数运算符都有重载,允许设置其内部缓冲区大小。

2.2、Flowable和Observable对比

在上面已经说明了二者最大的区别。

官方也给出的解释是:

The design of this class was derived from the Reactive-Streams design and specification by removing any backpressure-related infrastructure and implementation detail, replacing the org.reactivestreams.Subscription with Disposable as the primary means to dispose of a flow.

中文翻译:

该类的设计源自Reactive-Streams设计和规范,通过删除任何与背压相关的基本结构和实现细节,将Disposable替换为org.reactivestreams.Subscription作为处理流的主要方式。

从代码层面上做简单的说明,Flowable实现了Publisher接口,Publisher源码如下:

public interface Publisher<T> {

    /**
     * Request {@link Publisher} to start streaming data.
     * <p>
     * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
     * <p>
     * Each {@link Subscription} will work for only a single {@link Subscriber}.
     * <p>
     * A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
     * <p>
     * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will
     * signal the error via {@link Subscriber#onError}.
     *
     * @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
     */
    public void subscribe(Subscriber<? super T> s);
}

Observable实现了ObservableSource接口,ObservableSource源码如下

1
2
3
4
5
6
7
8
9
public interface ObservableSource<T> {

/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(Observer<? super T> observer);
}

对比ObservableSource和Publisher,都有一个同名的接口subscribe()

2.4、形象理解ObservableSource和Publisher有何异同

ObservableSource:可观察源

Publisher:发布者

subscribe:订阅

Subscriber:订阅者

Observer:观察者

对于ObservableSource,可以将subscribe(Observer observer)理解为Observer通过subscribe订阅了ObservableSource

对于Publisher,可以将subscribe(Subscriber s)理解为Subscriber通过subscribe订阅了Publisher

上面的解释可能比较抽象,通俗的举个例子,来个角色扮演

第一组:报刊(ObservableSource)、报刊订阅者(Observer)、订阅报刊的行为(subscribe)

第二组:报刊发布人(Publisher)、报刊订阅者(Subscriber)、订阅报刊的行为(subscribe)

把这个场景串起来讲就是:报刊订阅者订阅了报刊,或者说报刊订阅者在报刊发布人手中订阅了报刊。

这其实是典型的观察者模式,所不同的是信息的发布者是ObservableSource还是Publisher,信息的订阅者是Observer还是Subscriber,统一的行为都是subscribe。

3、Single

3.1、Single简介

public abstract class Single<T> implements SingleSource<T> 

Single实现了SingleSource

/**
 * Represents a basic {@link Single} source base interface,
 * consumable via an {@link SingleObserver}.
 * <p>
 * This class also serves the base type for custom operators wrapped into
 * Single via {@link Single#create(SingleOnSubscribe)}.
 *
 * @param <T> the element type
 * @since 2.0
 */
public interface SingleSource<T> {

    /**
     * Subscribes the given SingleObserver to this SingleSource instance.
     * @param observer the SingleObserver, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(SingleObserver<? super T> observer);
}

Single类为单个值响应实现Reactive Pattern。

Single和Observable类似,所不同的是Single只能发出一个值,要么发射成功要么发射失败,也没有“onComplete”作为完成时的回调

Single类实现了基类SingleSource的接口,SingleObserver作为Single发出来的消息的默认消费者,SingleObserver通过subscribe(SingleObserver<? super T> observer)在Single中订阅消息

public interface SingleObserver<T> {

    /**
     * Provides the SingleObserver with the means of cancelling (disposing) the
     * connection (channel) with the Single in both
     * synchronous (from within {@code onSubscribe(Disposable)} itself) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(Disposable d);

    /**
     * Notifies the SingleObserver with a single item and that the {@link Single} has finished sending
     * push-based notifications.
     * <p>
     * The {@link Single} will not call this method if it calls {@link #onError}.
     *
     * @param value
     *          the item emitted by the Single
     */
    void onSuccess(T value);

    /**
     * Notifies the SingleObserver that the {@link Single} has experienced an error condition.
     * <p>
     * If the {@link Single} calls this method, it will not thereafter call {@link #onSuccess}.
     *
     * @param e
     *          the exception encountered by the Single
     */
    void onError(Throwable e);
}

3.2、Single与Observable的区别

Single只能发送单个消息,不能发送消息流,而且观察者接收到消息也只有两种情况,要么接收成功,要么接收失败

3.3、Single官方图解

4、Completable

4.1、Completable简介

Completable类表示延迟计算,没有任何值,只表示完成或异常

Completable的行为类似于Observable,在计算完成后只能发出完成或错误信号,由onComplete或onError接口来处理,没有onNext或onSuccess等回调接口

Completable实现了基类CompletableSource的接口,CompletableObserver通过subscribe()方法在Completable处订阅消息。

Completable遵循协议:onSubscribe (onComplete | onError)

public abstract class Completable implements CompletableSource


public interface CompletableSource {

    /**
     * Subscribes the given CompletableObserver to this CompletableSource instance.
     * @param cs the CompletableObserver, not null
     * @throws NullPointerException if {@code cs} is null
     */
    void subscribe(CompletableObserver cs);
}


public interface CompletableObserver {

    void onSubscribe(Disposable d);

    void onComplete();

    void onError(Throwable e);
}

从源码中我们可以看到CompletableObserver里面有三个接口:

1)onSubscribe中传入参数Disposable,由Completable调用一次,在CompletableObserver实例上设置Disposable,然后可以随时取消订阅。

2)onComplete一旦延迟计算正常完成将会被调用

3)onError 一旦延迟计算抛出异常将会被调用

4.3、Completable示例

注意: 通过Disposable调用dispose()取消订阅,后面的消息无法接收。

运行下面的例子,可以看到在调用dispose后,onStart被回调后,后续的消息就收不到了;去掉dispose,onStart回调后,三秒后onComplete将会被回调

 private void doCompletable() {
        Disposable d = Completable.complete()
                .delay(3, TimeUnit.SECONDS, Schedulers.io())
                .subscribeWith(new DisposableCompletableObserver() {
                    @Override
                    public void onStart() {
                        System.out.println("Started");
                    }

                    @Override
                    public void onError(Throwable error) {
                        error.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete!");
                    }
                });


        d.dispose();

    }

运行完毕的结果是:
10-02 11:10:34.797 15565-15565/hq.demo.net I/System.out: Started

注释d.dispose()后在运行结果是:
10-02 14:34:19.490 23232-23232/hq.demo.net I/System.out: Started
10-02 14:34:22.492 23232-23483/hq.demo.net I/System.out: Done!

上面使用的是DisposableCompletableObserver通过subscribeWith来订阅消息,返回一个Disposable可以通过dispose来取消订阅关系,DisposableCompletableObserver是CompletableObserve的子类,只是增加了可取消订阅的功能。当然也能通过CompletableObserve方法操作,但是无法取消订阅关系,除此外没什么本质区别。

5、Maybe

5.1、Maybe简介

Maybe类表示延迟计算和单个值的发射,这个值可能根本没有或异常。

Maybe类实现MaybeSource的接口,MaybeObserver通过subscribe(MaybeObserver)在Maybe处订阅消息

Maybe遵循协议:onSubscribe (onSuccess | onError | onComplete),也就是Maybe发射消息后,可能会回调的接口是onSuccess | onError | onComplete

public abstract class Maybe<T> implements MaybeSource<T>

public interface MaybeSource<T> {

    /**
     * Subscribes the given MaybeObserver to this MaybeSource instance.
     * @param observer the MaybeObserver, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(MaybeObserver<? super T> observer);
}


public interface MaybeObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable e);
    void onComplete();
}

5.3、Maybe示例

下面是个例子,注意让线程睡多少秒可以修改测试dispose,与Completable类似,但是无论怎么onStart()都会被回调,为什么onStart()都会被回调呢?可以看DisposableMaybeObserver源码,在订阅消息的时候就会首先回调onSubscribe,这个时候dispose还没有运行了,这个动作发生在订阅的时候,没有订阅何来取消订阅呢。

public abstract class DisposableMaybeObserver<T> implements MaybeObserver<T>, Disposable {
    final AtomicReference<Disposable> s = new AtomicReference<Disposable>();

    @Override
    public final void onSubscribe(Disposable s) {
        if (DisposableHelper.setOnce(this.s, s)) {
            onStart();
        }
    }

    /**
     * Called once the single upstream Disposable is set via onSubscribe.
     */
    protected void onStart() {
    }

    @Override
    public final boolean isDisposed() {
        return s.get() == DisposableHelper.DISPOSED;
    }

    @Override
    public final void dispose() {
        DisposableHelper.dispose(s);
    }
}

下面是实例的运行和结果

private void doMaybe() {

        new Thread(new Runnable() {
            @Override
            public void run() {
                Disposable d = Maybe.just("Hello World")
                        .delay(3, TimeUnit.SECONDS, Schedulers.io())
                        .subscribeWith(new DisposableMaybeObserver<String>() {
                            @Override
                            public void onStart() {
                                System.out.println("Started");
                            }

                            @Override
                            public void onSuccess(String value) {
                                System.out.println("Success: " + value);
                            }

                            @Override
                            public void onError(Throwable error) {
                                error.printStackTrace();
                            }

                            @Override
                            public void onComplete() {
                                System.out.println("Done!");
                            }
                        });

                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                d.dispose();
            }
        }).start();

    }


运行结果是:
10-02 15:01:53.320 25573-25649/hq.demo.net I/System.out: Started
10-02 15:01:56.324 25573-25654/hq.demo.net I/System.out: Success: Hello World

如果把Thread.sleep(4000)修改为Thread.sleep(2000)运行结果是:
10-02 15:05:34.362 25840-25872/hq.demo.net I/System.out: Started

上面例子使用DisposableMaybeObserver通过subscribeWith在Maybe处订阅,并返回一个Disposable,可以通过Disposable调用dispose来取消订阅。当然我们也可以通过下面的方式来完成,但是无法取消订阅关系:

private void doMaybe() {
    Maybe.just("Hello World")
            .delay(3, TimeUnit.SECONDS, Schedulers.io())
            .subscribe(new MaybeObserver<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("Started");
                }

                @Override
                public void onSuccess(String value) {
                    System.out.println("Success: " + value);
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }

                @Override
                public void onComplete() {
                    System.out.println("Done!");
                }
            });

    }

6、总结

Single、Completable、Maybe是简化的Observable,只是具有少部分功能:

Single:只能发射一条单一数据或者一条异常通知,不能发射完成通知,数据与通知只能发射一个,二选一
Completable:只能发射一条完成通知或者一条异常通知,不能发射数据,要么发射完成通知要么发射异常通知,二选一
Maybe:只能发射一条单一数据,和发射一条完成通知,或者一条异常通知,完成通知和异常通知二选一,只能在发射完成通知或异常通知之前发射数据,否则发射数据无效

7、参考资料

https://blog.csdn.net/weixin_36709064/article/details/82911270
Flowable背压支持