RxJava2操作符汇总

RxJava是一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库(a library for composing asynchronous and event-based programs using observable sequences for the Java VM)。

RxJava能帮助我们在实现异步执行的前提下保持代码的清晰。它的原理就是创建一个Observable来完成异步任务,组合使用各种不同的链式操作,来实现各种复杂的操作,最终将任务的执行结果发射给Observer进行处理。

1、简介

RxJava 有以下三个基本的元素:
1.被观察者(Observable)
2.观察者(Observer)
3.订阅(subscribe)

1.创建被观察者:

Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
});

2.创建观察者:

Observer observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
};

3.订阅

observable.subscribe(observer);

也可以链式调用

RxJava 中的调度器

调度器 作用
Schedulers.computation( ) 用于使用计算任务,如事件循环和回调处理
Schedulers.immediate( ) 当前线程
Schedulers.io( ) 用于 IO 密集型任务,如果异步阻塞 IO 操作
Schedulers.newThread( ) 创建一个新的线程
AndroidSchedulers.mainThread() Android 的 UI 线程,用于操作 UI

内存泄漏

  • 每次掉用过onError或onComplete其中一个方法后,就会掉用dispose()方法解除订阅
  • CompositeDisposable可以容纳多个disposable,每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可快速解除所有添加的Disposable类.

2、创建操作符

create()

public static <T> Observable<T> create(ObservableOnSubscribe<T> source)

作用:创建一个被观察者

just()

public static <T> Observable<T> just(T item) 
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

作用:创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。

示例如下:

Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});

看看打印结果:
=================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete 

fromArray()

public static <T> Observable<T> fromArray(T... items)

作用:这个方法和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组。

fromIterable()

public static <T> Observable<T> fromIterable(Iterable<? extends T> source)

作用:这个方法和 fromArray() 类似,直接发送一个 List 集合数据给观察者

fromCallable() & fromRunnalbe()

public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)

作用:这里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的。

Observable.fromCallable()类似于:

Observable.defer {
    try {
        Observable.just(...)
    } catch(e: Throwable) {
        Observable.error(e)
    }
}

因此,just为运行同步,而fromCallable可以被推迟到另一个Scheduler与subscribeOn(“后”和执行)。

示例如下:

Observable.fromCallable(new Callable < Integer > () {

    @Override
    public Integer call() throws Exception {
        return 1;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "================accept " + integer);
    }
});

fromAction()

rxjava-async模块还包含一个fromAction操作符,它接受一个Action作为参数,返回一个Observable,一旦Action终止,它发射这个你传递给fromAction的数据。

Maybe、Completable 专用,相当于执行完成 Action 中的代码并且调用 onComplete,很方便。

fromFuture()

public static <T> Observable<T> fromFuture(Future<? extends T> future)

作用:参数中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通过 get() 方法来获取 Callable 返回的值。

示例如下:

FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {
    @Override
    public String call() throws Exception {
        Log.d(TAG, "CallableDemo is Running");
        return "返回结果";
    }
});

Observable.fromFuture(futureTask)
    .doOnSubscribe(new Consumer < Disposable > () {
    @Override
    public void accept(Disposable disposable) throws Exception {
        futureTask.run();
    }
})
.subscribe(new Consumer < String > () {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "================accept " + s);
    }
});

doOnSubscribe() 的作用就是只有订阅时才会发送事件。

defer()

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)

作用:这个方法的作用就是直到被观察者被订阅后才会创建被观察者。

示例如下:

// i 要定义为成员变量
Integer i = 100;

Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> call() throws Exception {
        return Observable.just(i);
    }
});

i = 200;

Observer observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};

observable.subscribe(observer);

i = 300;

observable.subscribe(observer);

打印结果如下:

================onNext 200
================onNext 300

因为 defer() 只有观察者订阅的时候才会创建新的被观察者,所以每订阅一次就会打印一次,并且都是打印 i 最新的值。

timer()

public static Observable<Long> timer(long delay, TimeUnit unit) 

作用:当到指定时间后就会发送一个 0L 的值给观察者。

示例如下:

Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "===============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

interval()

public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)

作用:每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。

示例如下:

Observable.interval(4, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

打印结果:
05-20 20:48:10.321 28723-28723/com.example.louder.rxjavademo D/chan: ==============onSubscribe 
05-20 20:48:14.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 0
05-20 20:48:18.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 1
05-20 20:48:22.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 2
05-20 20:48:26.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 3
05-20 20:48:30.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 4
05-20 20:48:34.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 5

从时间就可以看出每隔4秒就会发出一次数字递增1的事件。这里说下 interval() 第三个方法的 initialDelay 参数,这个参数的意思就是 onSubscribe 回调之后,再次回调 onNext 的间隔时间。

intervalRange()

public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

作用:可以指定发送事件的开始值和数量,其他与 interval() 的功能一样。

示例如下:

Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
打印结果:
05-21 00:03:01.672 2504-2504/com.example.louder.rxjavademo D/chan: ==============onSubscribe 
05-21 00:03:03.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 2
05-21 00:03:04.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 3
05-21 00:03:05.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 4
05-21 00:03:06.673 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 5
05-21 00:03:07.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 6

可以看出收到5次 onNext 事件,并且是从 2 开始的。

range()

public static Observable<Integer> range(final int start, final int count)

作用:同时发送一定范围的事件序列。

示例如下:

Observable.range(2, 5)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }

    @Override
    public void onNext(Integer aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

打印结果:
==============onSubscribe 
==============onNext 2
==============onNext 3
==============onNext 4
==============onNext 5
==============onNext 6

rangeLong()

public static Observable<Long> rangeLong(long start, long count)

作用:与 range() 一样,只是数据类型为 Long

empty() & never() & error()

public static <T> Observable<T> empty()
public static <T> Observable<T> never()
public static <T> Observable<T> error(final Throwable exception)

作用:
1.empty() : 直接发送 onComplete() 事件
2.never():不发送任何事件
3.error():发送 onError() 事件

示例如下:

Observable.empty()
.subscribe(new Observer < Object > () {

    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe");
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "==================onNext");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError " + e);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete");
    }
});

打印结果:

==================onSubscribe
==================onComplete

换成 never() 的打印结果:

==================onSubscribe

换成 error() 的打印结果:

==================onSubscribe
==================onError java.lang.NullPointerException

amb()

public static Observable amb(Iterable<? extends ObservableSource<? extends T>> sources)

作用:amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃

示例如下:

ArrayList < Observable < Long >> list = new ArrayList < > ();
list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS));
list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS));
Observable.amb(list)
.subscribe(new Consumer < Long > () {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "========================aLong " + aLong);
    }
});
打印结果:
05-26 10:21:29.580 17185-17219/com.example.rxjavademo D/chan: ========================aLong 6
05-26 10:21:30.580 17185-17219/com.example.rxjavademo D/chan: ========================aLong 7
05-26 10:21:31.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 8
05-26 10:21:32.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 9
05-26 10:21:33.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 10

3、转换操作符

map()

map可以将被观察者发送的数据类型转变成其他的类型

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)

以下代码将 Integer 类型的数据转换成 String:

Observable.just(1, 2, 3)
.map(new Function < Integer, String > () {
    @Override
    public String apply(Integer integer) throws Exception {
        return "I'm " + integer;
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(TAG, "===================onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.e(TAG, "===================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

打印结果:
===================onSubscribe
===================onNext I'm 1
===================onNext I'm 2
===================onNext I'm 3

flatMap()

这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable。

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

现在有一个需求就是要将 Person 集合中的每个元素中的 Plan 的 action 打印出来。首先用 map() 来实现这个需求看看:

Observable.fromIterable(personList)
.map(new Function < Person, List < Plan >> () {
    @Override
    public List < Plan > apply(Person person) throws Exception {
        return person.getPlanList();
    }
})
.subscribe(new Observer < List < Plan >> () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(List < Plan > plans) {
        for (Plan plan: plans) {
            List < String > planActionList = plan.getActionList();
            for (String action: planActionList) {
                Log.d(TAG, "==================action " + action);
            }
        }
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

可以看到 onNext() 用了嵌套 for 循环来实现,如果代码逻辑复杂起来的话,可能需要多重循环才可以实现。现在看下使用 flatMap() 实现:

Observable.fromIterable(personList)
.flatMap(new Function < Person, ObservableSource < Plan >> () {
    @Override
    public ObservableSource < Plan > apply(Person person) {
        return Observable.fromIterable(person.getPlanList());
    }
})
.flatMap(new Function < Plan, ObservableSource < String >> () {
    @Override
    public ObservableSource < String > apply(Plan plan) throws Exception {
        return Observable.fromIterable(plan.getActionList());
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "==================action: " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

从代码可以看出,只需要两个 flatMap() 就可以完成需求,并且代码逻辑非常清晰。

concatMap()

concatMap() 和 flatMap() 基本上是一样的,只不过 concatMap() 转发出来的事件是有序的,不管是否延时,而 flatMap() 是无序的。

public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)

buffer()

从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。

public final Observable<List<T>> buffer(int count, int skip)

buffer 有两个参数,一个是 count,另一个 skip。count 缓冲区元素的数量,skip 就代表缓冲区满了之后,发送下一次事件序列的时候要跳过多少元素。这样说可能还是有点抽象,直接看代码:

Observable.just(1, 2, 3, 4, 5)
.buffer(2, 1)
.subscribe(new Observer < List < Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(List < Integer > integers) {
        Log.d(TAG, "================缓冲区大小: " + integers.size());
        for (Integer i: integers) {
            Log.d(TAG, "================元素: " + i);
        }
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
打印结果:
================缓冲区大小: 2
================元素: 1
================元素: 2
================缓冲区大小: 2
================元素: 2
================元素: 3
================缓冲区大小: 2
================元素: 3
================元素: 4
================缓冲区大小: 2
================元素: 4
================元素: 5
================缓冲区大小: 1
================元素: 5

从结果可以看出,每次发送事件,指针都会往后移动一个元素再取值,直到指针移动到没有元素的时候就会停止取值。

scan()

将数据以一定的逻辑聚合起来。

public final Observable<T> scan(BiFunction<T, T, T> accumulator)

示例如下:

Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction < Integer, Integer, Integer > () {
    @Override
    public Integer apply(Integer integer, Integer integer2) throws Exception {
        Log.d(TAG, "====================apply ");
        Log.d(TAG, "====================integer " + integer);
        Log.d(TAG, "====================integer2 " + integer2);
        return integer + integer2;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================accept " + integer);
    }
});
打印结果:
====================accept 1
====================apply 
====================integer 1
====================integer2 2
====================accept 3
====================apply 
====================integer 3
====================integer2 3
====================accept 6
====================apply 
====================integer 6
====================integer2 4
====================accept 10
====================apply 
====================integer 10
====================integer2 5
====================accept 15

groupBy()

将发送的数据进行分组,每个分组都会返回一个被观察者。

public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)

示例如下:

Observable.just(5, 2, 3, 4, 1, 6, 8, 9, 7, 10)
.groupBy(new Function < Integer, Integer > () {
    @Override
    public Integer apply(Integer integer) throws Exception {
        return integer % 3;
    }
})
.subscribe(new Observer < GroupedObservable < Integer, Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "====================onSubscribe ");
    }

    @Override
    public void onNext(GroupedObservable < Integer, Integer > integerIntegerGroupedObservable) {
        Log.d(TAG, "====================onNext ");
        integerIntegerGroupedObservable.subscribe(new Observer < Integer > () {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "====================GroupedObservable onSubscribe ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "====================GroupedObservable onNext  groupName: " + integerIntegerGroupedObservable.getKey() + " value: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "====================GroupedObservable onError ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "====================GroupedObservable onComplete ");
            }
        });
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "====================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "====================onComplete ");
    }
});

window()

发送指定数量的事件时,就将这些事件分为一组。window 中的 count 的参数就是代表指定的数量,例如将 count 指定为2,那么每发2个数据就会将这2个数据分成一组。

public final Observable<Observable<T>> window(long count)

示例如下:

Observable.just(1, 2, 3, 4, 5)
.window(2)
.subscribe(new Observer < Observable < Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=====================onSubscribe ");
    }

    @Override
    public void onNext(Observable < Integer > integerObservable) {
        integerObservable.subscribe(new Observer < Integer > () {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "=====================integerObservable onSubscribe ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "=====================integerObservable onNext " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "=====================integerObservable onError ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "=====================integerObservable onComplete ");
            }
        });
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=====================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=====================onComplete ");
    }
});
打印结果:
=====================onSubscribe 
=====================integerObservable onSubscribe 
=====================integerObservable onNext 1
=====================integerObservable onNext 2
=====================integerObservable onComplete 
=====================integerObservable onSubscribe 
=====================integerObservable onNext 3
=====================integerObservable onNext 4
=====================integerObservable onComplete 
=====================integerObservable onSubscribe 
=====================integerObservable onNext 5
=====================integerObservable onComplete 
=====================onComplete 

从结果可以发现,window() 将 1~5 的事件分成了3组。

compose()

对Observable进行变换,加工处理。

public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer)

示例如下:

Observable.just(1, 2, 3, 4, 5)
          .compose(new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(Observable<T> observable) { //比如给observable添加subscribeOn、observeOn
                return observable.subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            })
      .subscribe();

4、功能操作符

subscribeOn()

指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效。如果不指定,则默认为主线程。

public final Observable<T> subscribeOn(Scheduler scheduler)

observeOn()

指定观察者的线程,可以切换线程,每指定一次就会生效一次。

public final Observable<T> observeOn(Scheduler scheduler)

示例如下:

Observable.just(1, 2, 3)
.observeOn(Schedulers.newThread())
.flatMap(new Function < Integer, ObservableSource < String >> () {
    @Override
    public ObservableSource < String > apply(Integer integer) throws Exception {
        Log.d(TAG, "======================flatMap Thread name " + Thread.currentThread().getName());
        return Observable.just("chan" + integer);
    }
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "======================onNext Thread name " + Thread.currentThread().getName());
        Log.d(TAG, "======================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
});
打印结果:
======================onSubscribe
======================flatMap Thread name RxNewThreadScheduler-1
======================flatMap Thread name RxNewThreadScheduler-1
======================flatMap Thread name RxNewThreadScheduler-1
======================onNext Thread name main
======================onNext chan1
======================onNext Thread name main
======================onNext chan2
======================onNext Thread name main
======================onNext chan3
======================onComplete

从打印结果可以知道,observeOn 成功切换了线程。

delay()

延迟一段事件发送事件, onSubscribe 回调之后 onNext 延时后才会回调。

public final Observable<T> delay(long delay, TimeUnit unit)

doOnEach()

Observable 每发送一件事件之前都会先回调这个方法。

public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification)

示例如下:

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        //      e.onError(new NumberFormatException());
        e.onComplete();
    }
})
.doOnEach(new Consumer < Notification < Integer >> () {
    @Override
    public void accept(Notification < Integer > integerNotification) throws Exception {
        Log.d(TAG, "==================doOnEach " + integerNotification.getValue());
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
打印结果:
==================onSubscribe 
==================doOnEach 1
==================onNext 1
==================doOnEach 2
==================onNext 2
==================doOnEach 3
==================onNext 3
==================doOnEach null
==================onComplete 

从结果就可以看出每发送一个事件之前都会回调 doOnEach 方法,并且可以取出 onNext() 发送的值。

doOnNext()

Observable 每发送 onNext() 之前都会先回调这个方法。

public final Observable<T> doOnNext(Consumer<? super T> onNext)

doAfterNext()

Observable 每发送 onNext() 之后都会回调这个方法。

public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)

doOnComplete()

Observable 每发送 onComplete() 之前都会回调这个方法。

public final Observable<T> doOnComplete(Action onComplete)

doOnError()

Observable 每发送 onError() 之前都会回调这个方法。

public final Observable<T> doOnError(Consumer<? super Throwable> onError)

doOnSubscribe()

Observable 每发送 onSubscribe() 之前都会回调这个方法。

public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)

doOnDispose()

当调用 Disposable 的 dispose() 之后回调该方法。

public final Observable<T> doOnDispose(Action onDispose)

doOnLifecycle()

在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅。第二个参数的回调方法的作用与 doOnDispose() 是一样的。

public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)

示例如下:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doOnLifecycle(new Consumer<Disposable>() {
    @Override
    public void accept(Disposable disposable) throws Exception {
        Log.d(TAG, "==================doOnLifecycle accept");
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doOnLifecycle Action");
    }
})
.doOnDispose(
    new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "==================doOnDispose Action");
        }
})
.subscribe(new Observer<Integer>() {
    private Disposable d;
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
        this.d = d;
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
        d.dispose();
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }

});
打印结果:
 ==================doOnLifecycle accept
==================onSubscribe 
==================onNext 1
==================doOnDispose Action
==================doOnLifecycle Action

可以看到当在 onNext() 方法进行取消订阅操作后,doOnDispose() 和 doOnLifecycle() 都会被回调。
如果使用 doOnLifecycle 进行取消订阅,来看看打印结果:

==================doOnLifecycle accept
==================onSubscribe 

可以发现 doOnDispose Action 和 doOnLifecycle Action 都没有被回调。

doOnTerminate() & doAfterTerminate()

doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。

public final Observable<T> doOnTerminate(final Action onTerminate)
public final Observable<T> doAfterTerminate(Action onFinally)

doFinally()

在所有事件发送完毕之后回调该方法。

public final Observable<T> doFinally(Action onFinally)

这里可能你会有个问题,那就是 doFinally() 和 doAfterTerminate() 到底有什么区别?区别就是在于取消订阅,如果取消订阅之后 doAfterTerminate() 就不会被回调,而 doFinally() 无论怎么样都会被回调,且都会在事件序列的最后

onErrorReturn()

当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列。

public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier)

示例如下:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new NullPointerException());
    }
})
.onErrorReturn(new Function<Throwable, Integer>() {
    @Override
    public Integer apply(Throwable throwable) throws Exception {
        Log.d(TAG, "==================onErrorReturn " + throwable);
        return 404;
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
打印结果:
==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onErrorReturn java.lang.NullPointerException
==================onNext 404
==================onComplete 

onErrorResumeNext()

当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列。

public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction)

示例如下:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new NullPointerException());
    }
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
        Log.d(TAG, "==================onErrorResumeNext " + throwable);
        return Observable.just(4, 5, 6);
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
打印结果:
==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onErrorResumeNext java.lang.NullPointerException
==================onNext 4
==================onNext 5
==================onNext 6
==================onComplete 

onExceptionResumeNext()

与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception。

public final Observable<T> onExceptionResumeNext(final ObservableSource<? extends T> next)

retry()

如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数。

public final Observable<T> retry(long times)

retryUntil()

出现错误事件之后,可以通过此方法判断是否继续发送事件。指示Observable遇到错误时,是否让Observable重新订阅。

public final Observable<T> retryUntil(final BooleanSupplier stop)

示例如下:

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new Exception("404"));
    }
})
.retryUntil(new BooleanSupplier() {
    @Override
    public boolean getAsBoolean() throws Exception {
        if (i == 6) {
            return true;
        }
        return false;
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
打印结果:
==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onError 

retryWhen()

当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件。

public final void safeSubscribe(Observer<? super T> s)

示例如下:

Observable.create(new ObservableOnSubscribe < String > () {
    @Override
    public void subscribe(ObservableEmitter < String > e) throws Exception {
        e.onNext("chan");
        e.onNext("ze");
        e.onNext("de");
        e.onError(new Exception("404"));
        e.onNext("haha");
    }
})
.retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
    @Override
    public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
        return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
            @Override
            public ObservableSource <? > apply(Throwable throwable) throws Exception {
                if(!throwable.toString().equals("java.lang.Exception: 404")) {
                    return Observable.just("可以忽略的异常");
                } else {
                    return Observable.error(new Throwable("终止啦"));
                }
            }
        });
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "==================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError " + e.toString());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
打印结果:
==================onSubscribe 
==================onNext chan
==================onNext ze
==================onNext de
==================onError java.lang.Throwable: 终止啦

将 onError(new Exception("404")) 改为 onError(new Exception("303")) 看看打印结果:
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
......
从结果可以看出,会不断重复发送消息。

repeat()

重复发送被观察者的事件,times 为发送次数。

public final Observable<T> repeat(long times)

repeatWhen()

这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件。

public final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)

repeatWhen可以实现重订阅功能,而触发重订阅两个关键因素:
1.Obervable完成一次订阅,就是Observable调用onComplete
2.当Observable调用onComplete就会进入到repeatWhen方法里面,是否要触发重订阅,就需要通过repeatWhen的Function方法所返回的ObservableSource确定,如果返回的是onNext则触发重订阅,而返回的是onComplete/onError则不会触发重订阅

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onComplete();
            }
        }).doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "触发重订阅");
            }
        }).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {

            private int n = 0;

            @Override
            public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Object o) throws Exception {
                        if (n != 3) {
                            n++;
                            return Observable.timer(3, TimeUnit.SECONDS);
                        } else {
                            return Observable.empty();
                        }
                    }
                });
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

运行结果:
08-02 10:26:13.187 D/MainActivity-vv: 触发重订阅
08-02 10:26:16.196 D/MainActivity-vv: 触发重订阅
08-02 10:26:19.204 D/MainActivity-vv: 触发重订阅
08-02 10:26:22.206 D/MainActivity-vv: 触发重订阅

repeatWhen可以用于延时轮询,在doOnComplete进行操作

5、组合操作符

concat()

可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat() 最多只可以发送4个事件。

public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)

示例如下:

Observable.concat(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8))
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
打印如下:
================onNext 1
================onNext 2
================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8

concatArray()

与 concat() 作用一样,不过 concatArray() 可以发送多于 4 个被观察者。

public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)

merge() & mergeArray()

这个方法与 concat() 作用基本一样,只是 concat() 是串行发送事件,而 merge() 并行发送事件

public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)

现在来演示 concat() 和 merge() 的区别:

Observable.merge(
Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
    @Override
    public String apply(Long aLong) throws Exception {
        return "A" + aLong;
    }
}),
Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
    @Override
    public String apply(Long aLong) throws Exception {
        return "B" + aLong;
    }
}))
    .subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "=====================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
打印结果如下:
05-21 16:10:31.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B0
05-21 16:10:31.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A0
05-21 16:10:32.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A1
05-21 16:10:32.126 12801-12850/com.example.rxjavademo D/chan: =====================onNext B1
05-21 16:10:33.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A2
05-21 16:10:33.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B2
05-21 16:10:34.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A3
05-21 16:10:34.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B3
05-21 16:10:35.124 12801-12849/com.example.rxjavademo D/chan: =====================onNext A4
05-21 16:10:35.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B4
05-21 16:10:36.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A5
05-21 16:10:36.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B5
......
从结果可以看出,A 和 B 的事件序列都可以发出,将以上的代码换成 concat() 看看打印结果:
05-21 16:17:52.352 14597-14621/com.example.rxjavademo D/chan: =====================onNext A0
05-21 16:17:53.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A1
05-21 16:17:54.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A2
05-21 16:17:55.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A3
05-21 16:17:56.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A4
05-21 16:17:57.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A5
......
从结果可以知道,只有等到第一个被观察者发送完事件之后,第二个被观察者才会发送事件。

mergeArray() 与 merge() 的作用是一样的,只是它可以发送4个以上的被观察者。

concatArrayDelayError() & mergeArrayDelayError()

在 concatArray() 和 mergeArray() 两个方法当中,如果其中有一个被观察者发送了一个 Error 事件,那么就会停止发送事件,如果你想 onError() 事件延迟到所有被观察者都发送完事件后再执行的话,就可以使用 concatArrayDelayError() 和 mergeArrayDelayError()

public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)

示例如下:

Observable.concatArrayDelayError(
Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onError(new NumberFormatException());
    }
}), Observable.just(2, 3, 4))
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "===================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {

    }
});
打印结果如下:
===================onNext 1
===================onNext 2
===================onNext 3
===================onNext 4
===================onError 

从结果可以看到,onError 事件是在所有被观察者发送完事件才发送的。mergeArrayDelayError() 也是有同样的作用。

zip()

会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。

public static <T1, T2, R> Observable<R> zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper)

示例如下:

Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
    .map(new Function<Long, String>() {
        @Override
        public String apply(Long aLong) throws Exception {
            String s1 = "A" + aLong;
            Log.d(TAG, "===================A 发送的事件 " + s1);
            return s1;
        }}),
        Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
            .map(new Function<Long, String>() {
            @Override
            public String apply(Long aLong) throws Exception {
                String s2 = "B" + aLong;
                Log.d(TAG, "===================B 发送的事件 " + s2);
                return s2;
            }
        }),
        new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) throws Exception {
                String res = s + s2;
                return res;
            }
        })
.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "===================onSubscribe ");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "===================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "===================onComplete ");
    }
});
上面代码中有两个 Observable,第一个发送事件的数量为5个,第二个发送事件的数量为6个。现在来看下打印结果:
05-22 09:10:39.952 5338-5338/com.example.rxjavademo D/chan: ===================onSubscribe 
05-22 09:10:40.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A1
05-22 09:10:40.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B1
===================onNext A1B1
05-22 09:10:41.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A2
05-22 09:10:41.954 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B2
===================onNext A2B2
05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A3
05-22 09:10:42.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B3
05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================onNext A3B3
05-22 09:10:43.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A4
05-22 09:10:43.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B4
05-22 09:10:43.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A4B4
05-22 09:10:44.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A5
05-22 09:10:44.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B5
05-22 09:10:44.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A5B5
===================onComplete 

可以发现最终接收到的事件数量是5,那么为什么第二个 Observable 没有发送第6个事件呢?因为在这之前第一个 Observable 已经发送了 onComplete 事件,所以第二个 Observable 不会再发送事件。

combineLatest() & combineLatestDelayError()

public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)

combineLatest() 的作用与 zip() 类似,但是 combineLatest() 发送事件的序列是与发送的时间线有关的,当 combineLatest() 中所有的 Observable 都发送了事件,只要其中有一个 Observable 发送事件,这个事件就会和其他 Observable 最近发送的事件结合起来发送,这样可能还是比较抽象,看看以下例子代码:

Observable.combineLatest(
Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)
    .map(new Function < Long, String > () {@Override
    public String apply(Long aLong) throws Exception {
        String s1 = "A" + aLong;
        Log.d(TAG, "===================A 发送的事件 " + s1);
        return s1;
    }
}),
Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS)
    .map(new Function < Long, String > () {@Override
    public String apply(Long aLong) throws Exception {
        String s2 = "B" + aLong;
        Log.d(TAG, "===================B 发送的事件 " + s2);
        return s2;
    }
}),
new BiFunction < String, String, String > () {@Override
    public String apply(String s, String s2) throws Exception {
        String res = s + s2;
        return res;
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "===================onSubscribe ");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "===================最终接收到的事件 " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "===================onComplete ");
    }
});
分析上面的代码,Observable A 会每隔1秒就发送一次事件,Observable B 会隔2秒发送一次事件。来看看打印结果:
05-22 11:41:20.859 15104-15104/? D/chan: ===================onSubscribe 
05-22 11:41:21.859 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A1
05-22 11:41:22.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A2
05-22 11:41:22.861 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B1
05-22 11:41:22.862 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A2B1
05-22 11:41:23.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A3
===================最终接收到的事件 A3B1
05-22 11:41:24.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A4
05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B2
05-22 11:41:24.861 15104-15128/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B1
05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B2
05-22 11:41:26.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B3
05-22 11:41:26.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B3
05-22 11:41:28.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B4
05-22 11:41:28.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B4
05-22 11:41:30.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B5
05-22 11:41:30.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B5
===================onComplete 

分析上述结果可以知道,当发送 A1 事件之后,因为 B 并没有发送任何事件,所以根本不会发生结合。当 B 发送了 B1 事件之后,就会与 A 最近发送的事件 A2 结合成 A2B1,这样只有后面一有被观察者发送事件,这个事件就会与其他被观察者最近发送的事件结合起来了。

因为 combineLatestDelayError() 就是多了延迟发送 onError() 功能,这里就不再赘述了。

reduce()

与 scan() 操作符的作用也是将发送数据以一定逻辑聚合起来,这两个的区别在于 scan() 每处理一次数据就会将事件发送给观察者,而 reduce() 会将所有数据聚合在一起才会发送事件给观察者。

public final Maybe<T> reduce(BiFunction<T, T, T> reducer)

示例如下:
Observable.just(0, 1, 2, 3)
.reduce(new BiFunction < Integer, Integer, Integer > () {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
int res = integer + integer2;
Log.d(TAG, “====================integer “ + integer);
Log.d(TAG, “====================integer2 “ + integer2);
Log.d(TAG, “====================res “ + res);
return res;
}
})
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, “==================accept “ + integer);
}
});
打印结果:
====================integer 0
====================integer2 1
====================res 1
====================integer 1
====================integer2 2
====================res 3
====================integer 3
====================integer2 3
====================res 6
==================accept 6
从结果可以看到,其实就是前2个数据聚合之后,然后再与后1个数据进行聚合,一直到没有数据为止。

collect()

将数据收集到数据结构当中。

public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)

示例如下:

Observable.just(1, 2, 3, 4)
.collect(new Callable < ArrayList < Integer >> () {
    @Override
    public ArrayList < Integer > call() throws Exception {
        return new ArrayList < > ();
    }
},
new BiConsumer < ArrayList < Integer > , Integer > () {
    @Override
    public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {
        integers.add(integer);
    }
})
.subscribe(new Consumer < ArrayList < Integer >> () {
    @Override
    public void accept(ArrayList < Integer > integers) throws Exception {
        Log.d(TAG, "===============accept " + integers);
    }
});
打印结果:
===============accept [1, 2, 3, 4]

startWith() & startWithArray()

在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出。

public final Observable<T> startWith(T item)
public final Observable<T> startWithArray(T... items)

示例如下:

Observable.just(5, 6, 7)
.startWithArray(2, 3, 4)
.startWith(1)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "================accept " + integer);
    }
});
打印结果:
================accept 1
================accept 2
================accept 3
================accept 4
================accept 5
================accept 6
================accept 7

count()

返回被观察者发送事件的数量。

public final Single<Long> count()

示例如下:

Observable.just(1, 2, 3)
.count()
.subscribe(new Consumer < Long > () {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "=======================aLong " + aLong);
    }
});
打印结果:
=======================aLong 3

6、过滤操作符

filter()

通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送。
public final Observable filter(Predicate<? super T> predicate)

示例如下:

 Observable.just(1, 2, 3)
    .filter(new Predicate < Integer > () {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer < 2;
        }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
以上代码只有小于2的事件才会发送,来看看打印结果:
==================onSubscribe 
==================onNext 1
==================onComplete 

ofType()

过滤不符合该类型事件

public final <U> Observable<U> ofType(final Class<U> clazz)

示例如下:

Observable.just(1, 2, 3, "chan", "zhide")
.ofType(Integer.class)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
打印结果:
==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete 

skip() & skipLast()

跳过正序某些事件,count 代表跳过事件的数量

public final Observable<T> skip(long count)

skipLast() 作用也是跳过某些事件,不过它是用来跳过正序的后面的事件。

distinct()

过滤事件序列中的重复事件。

public final Observable<T> distinct() 

distinctUntilChanged()

过滤掉连续重复的事件

public final Observable<T> distinctUntilChanged()

示例如下:

Observable.just(1, 2, 3, 3, 2, 1)
.distinctUntilChanged()
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
打印结果:
==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 2
==================onNext 1
==================onComplete 

因为事件序列中连续出现两次3,所以第二次3并不会发出。

take() & takeLast()

控制观察者接收的事件的数量。

public final Observable<T> take(long count)

示例如下:

Observable.just(1, 2, 3, 4, 5)
.take(3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
打印结果:
==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete 

takeLast() 的作用就是控制观察者只能接受事件序列的后面几件事情。

debounce()

如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。

public final Observable<T> debounce(long timeout, TimeUnit unit)

throttleWithTimeout() 与此方法的作用一样。

firstElement() && lastElement()

firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素。

public final Maybe<T> firstElement()
public final Maybe<T> lastElement()

elementAt() & elementAtOrError()

elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。这种情况下,你想发出异常信息的话就用 elementAtOrError() 。

public final Maybe<T> elementAt(long index)
public final Single<T> elementAtOrError(long index)

示例如下:

Observable.just(1, 2, 3, 4)
.elementAt(0)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================accept " + integer);
    }
});
打印结果:
====================accept 1

将 elementAt() 的值改为5,这时是没有打印结果的,因为没有满足条件的元素。
替换 elementAt() 为 elementAtOrError(),代码如下:

Observable.just(1, 2, 3, 4)
.elementAtOrError(5)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================accept " + integer);
    }
});

这时候会抛出 NoSuchElementException 异常。

7、条件操作符

all()

判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false。

public final Single<Boolean> all(Predicate<? super T> predicate)

示例如下:

Observable.just(1, 2, 3, 4)
.all(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer < 5;
    }
})
.subscribe(new Consumer < Boolean > () {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "==================aBoolean " + aBoolean);
    }
});
打印结果:
==================aBoolean true

takeWhile()

可以设置条件,当某个数据满足条件时就会发送该数据,反之则不发送

public final Observable<T> takeWhile(Predicate<? super T> predicate)

示例如下:

Observable.just(1, 2, 3, 4)
.takeWhile(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer < 3;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "========================integer " + integer);
    }
});
打印结果:
========================integer 1
========================integer 2

skipWhile()

可以设置条件,当某个数据满足条件时不发送该数据,反之则发送。

public final Observable<T> skipWhile(Predicate<? super T> predicate)

takeUntil()

可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了。

public final Observable<T> takeUntil(Predicate<? super T> stopPredicate

示例如下:

Observable.just(1, 2, 3, 4, 5, 6)
.takeUntil(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer > 3;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "========================integer " + integer);
    }
});
打印结果:
========================integer 1
========================integer 2
========================integer 3
========================integer 4

skipUntil()

skipUntil() 中的 Observable 发送事件了,原来的 Observable 才会发送事件给观察者。skipUntil() 里的 Observable 并不会发送事件给观察者。

public final <U> Observable<T> skipUntil(ObservableSource<U> other)

示例如下:

Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.skipUntil(Observable.intervalRange(6, 5, 3, 1, TimeUnit.SECONDS))
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "========================onSubscribe ");
    }

    @Override
    public void onNext(Long along) {
        Log.d(TAG, "========================onNext " + along);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "========================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "========================onComplete ");
    }
});
打印结果:
05-26 10:08:50.574 13023-13023/com.example.rxjavademo D/chan: ========================onSubscribe 
05-26 10:08:53.576 13023-13054/com.example.rxjavademo D/chan: ========================onNext 4
05-26 10:08:54.576 13023-13054/com.example.rxjavademo D/chan: ========================onNext 5
========================onComplete 

sequenceEqual()

判断两个 Observable 发送的事件是否相同。

public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)

contains()

判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false。

public final Single<Boolean> contains(final Object element)

isEmpty()

判断事件序列是否为空。

public final Single<Boolean> isEmpty()

defaultIfEmpty()

如果观察者只发送一个 onComplete() 事件,这个方法会发送一个值。

public final Observable<T> defaultIfEmpty(T defaultItem)

示例如下:

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onComplete();
    }
})
.defaultIfEmpty(666)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "========================onNext " + integer);
    }
});
打印结果:
========================onNext 666

参考资料

https://juejin.im/post/5b17560e6fb9a01e2862246f
https://gank.io/post/560e15be2dca930e00da1083