本篇主要对以下三个方面进行讲解:
- RxJava是流式编程,在每一条流中,都至少包含三个要素:源头/被订阅者(Observable或Flowable)、订阅者(Observer或subscriber)、触发时机(subscribe()方法);
- 其次就是线程切换(subscribeOn()和observeOn());
- 最后就是数据操作(如map()、flatMap()等)。
1、订阅(subscribe)
首先看下最简单的情况:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
// doSomething, eg:
// emitter.onNext("onNext");
// emitter.onComplete();
}
}).subscribe();
一条RxJava流若是没有调用subscribe()方法,该流便无法执行,即必须由subscribe()确定了订阅关系后这条流才能生效,原因如下:
// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
// 可以直接忽略RxJavaPlugins的相关方法,不影响我们理解原理
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
// 无论调用subscribe的哪个重载方法,最终都会走到这个方法
public final void subscribe(Observer<? super T> observer) {
... // 省去不重要代码
subscribeActual(observer);
...
}
protected abstract void subscribeActual(Observer<? super T> observer);
可以看到subscribe()里面主要是调用了subscribeActual,而subscribeActual是一个抽象方法,所以具体实现在子类中,这里的子类便是ObservableCreate,再来看它的实现:
// ObservableCreate.java
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
ObservableCreate中subscribeActual的实现就是将我们的observer封装成CreateEmitter(ObservableEmitter的实现类),再执行observer.onSubscribe,确保onSubscribe总能在onNext等其他订阅行为之前执行,接着就是我们的核心代码:source.subscribe(parent),source便是我们一开始创建流时新建的ObservableOnSubscribe对象,而parent则是封装后的CreateEmitter,所以其实此时执行的便是在创建ObservableOnSubscribe时实现的public void subscribe(ObservableEmitter
现在我们知道,事件流的执行实际上是由子类实现的subscribeActual控制的,所以其他的Observable创建方式也是一样的道理,这里再以fromIterable为例看一下:
// Observable.java
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableFromIterable<T>(source));
}
// ObservableFromIterable.java
public ObservableFromIterable(Iterable<? extends T> source) {
this.source = source;
}
@Override
public void subscribeActual(Observer<? super T> s) {
Iterator<? extends T> it;
try {
it = source.iterator();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, s);
return;
}
boolean hasNext;
try {
hasNext = it.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, s);
return;
}
if (!hasNext) {
EmptyDisposable.complete(s);
return;
}
FromIterableDisposable<T> d = new FromIterableDisposable<T>(s, it);
s.onSubscribe(d);
if (!d.fusionMode) {
d.run();
}
}
可以看到,这里是将observer和我们的数据源列表封装为FromIterableDisposable,然后执行d.run(),下面看下run的实现:
// FromIterableDisposable.java
FromIterableDisposable(Observer<? super T> actual, Iterator<? extends T> it) {
this.actual = actual;
this.it = it;
}
void run() {
boolean hasNext;
do {
if (isDisposed()) {
return;
}
T v;
try {
v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
return;
}
actual.onNext(v);
if (isDisposed()) {
return;
}
try {
hasNext = it.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
return;
}
} while (hasNext);
if (!isDisposed()) {
actual.onComplete();
}
}
在run方法中不停地遍历数据源列表,然后根据实际情况执行对应的事件处理方法(actual.onNext(v);等,actual即为我们传进来的observer),这便完成了RxJava流的处理。
总结:RxJava2的订阅原理其实便是在subcribe时执行子类中实现的subscribeActual方法,该方法最终会去调用observer相关的订阅方法,可理解为观察者模式的一种变形。
2、线程切换
subscribeOn
// Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
subscribeOn时其实也是创建了一个Observable子类去实现subscribeActual方法:
// ObservableSubscribeOn.java
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
这里将下游的Observer s封装成SubscribeOnObserver后又封装成Runnable的实现类SubscribeTask,在run方法中source.subscribe(parent):
// ObservableSubscribeOn.java
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
然后通过scheduler.scheduleDirect(new SubscribeTask(parent))将这个Task放到Worker中:
// Scheduler.java
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
public abstract Worker createWorker();
而createWorker由Scheduler子类实现,即我们执行的线程类型,如AndroidSchedulers.mainThread()、Schedulers.io(),这里以mainThread()为例,
// AndroidSchedulers.java
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
RxAndroidPlugins里面的非常简单,这里最终返回的就是HandlerScheduler,因此来看下它实现的createWorker和schedule方法:
// HandlerScheduler.java
public Worker createWorker() { return new HandlerWorker(handler); }
在HandlerWorker中实现schedule(task, delay, unit):
// HandlerWorker.java
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, unit.toMillis(delay));
....
return scheduled;
}
这里将handler(由上面创建DEFAULT Scheduler时可知,该handler为UI线程的handler)和run又再次封装为ScheduledRunnable,然后通过handler发送到主线程处理,因此便保证了订阅操作(source.subscribe(parent))执行在主线程。
同时,由上面可知,rx流由subscribe开始触发,然后执行source.subscribe(observer),然而source可能也是上游操作后的产物(如map),因此便会触发上游内部的subscribe,直到源头,即rx流由subscribe开始触发,然后逆向向上寻找直到源头,才开始真正的执行。因此,若是有多个subscribeOn,最终的subscribe也是被放到最上面的subscribeOn(即第一个)指定的线程中执行,这便是指定多个subscribeOn只有第一个生效的原因。
总结:其实就是将订阅操作放到Runnable中执行,并结合handler机制将Runnable发送到主线程,对于其他线程(不同的线程模式会创建不同的Scheduler,并持有对应的线程池)则是将Runnable交给指定线程池处理。这便保证了在指定线程获取/处理数据源(observable)。
observeOn
// Observable.java
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
observeOn的线程切换则是在ObservableObserveOn中处理的:
// ObservableObserveOn.java
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
这里也是根据指定线程类型创建Worker(可参考上面subscribeOn原理),并将observer和w一同放到ObserveOnObserver中:
// ObserveOnObserver.java
public void onNext(T t) {
....
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
...
for (;;) {
...
for (;;) {
...
a.onNext(v);
}
...
}
}
可以看到,onNext执行的是schedule,而schedule则是将该对象直接放到指定线程的Worker中,然后在run中去执行对应的事件处理方法(onNext等),因此便实现了将下游的observer放到指定线程执行的目的,当然,这里只是将其直接下游的observer放到指定线程而已,对于其下游的下游则不一定。也就是说,observeOn可以有多个,每个都是对其直接下游做线程切换,若是下游不再切换,则所有下游都在该指定线程中执行。
总结:observeOn其实就是将下游的observer放到指定线程里面去执行。
3、数据变换
在事件从源头流到最终观察者的过程中,我们可以对事件进行操作转换,这里以map和flatMap为例进行解析。
map
// Observable.java
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
执行map操作时其实是将上游的observable和我们自己实现的mapper封住成一个ObservableMap,所以具体实现就是在ObservableMap的subscribeActual中:
// ObservableMap.java
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
看到这句是不是很熟悉,从上面的订阅原理可知,到这里其实真正执行的便是MapObserver中的onNext等方法了,而其onNext里便是先执行map转换,再将转换结果交给下游的observer执行:
// MapObserver.java
@Override
public void onNext(T t) {
if (done) { // error or complete
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
flatMap
我们调用的flatMap最终执行的是该重载方法:
// Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
...
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
因此看下ObservableFlatMap:
// ObservableFlatMap.java
public ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
所以实际的操作是在MergeObserver,这里我们就看下onNext就好了:
// MergeObserver.java
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
subscribeInner(p);
}
可以看到,这里是想将上游传进来的对象通过我们自己实现的mapper进行转换,然后再执行subscribeInner(p)(maxConcurrency默认就是Integer.MAX_VALUE),因此看下subscribeInner(p):
// MergeObserver.java
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
boolean empty = false;
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
empty = true;
}
}
if (empty) {
drain();
break;
}
} else {
break;
}
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
这里是个无线循环,根据是否为Callable类型执行不同的逻辑,一般Observable.just为Callable,而from类型的则不是。这里看下from的逻辑,毕竟Callable类型又没指定maxConcurrency的话,是直接break,所以没什么好看的。而非Callable类型的,可以看到这里又封装成了InnerObserver,而for循环并没有什么用。
总结:在执行操作符方法(如map、flatMap等)时,会生成对应的Observable对象,在该对象中实现具体业务逻辑,对上游流下来的数据进行操作,再将处理后的结果交给下游的的订阅者继续处理。
4、总结
RxJava2事件流的产生由subscribe方法调用subscribeActual(observer)触发,而subscribeActual由Observable子类实现,每个子类里的实现逻辑不同,可能会先执行自己的操作(如map或flatMap等 ),但最终都会调用source.subscribe,source即为该节点的上游数据源,因此需要上游操作执行完才能拿到source,最终便形成逐级逆向向上获取数据源(Observable或Flowable),即形成了从最开始的源头发射数据一路向下经过各个节点的操作后交给最终观察者的链式模型。
而对于线程切换,subscribeOn即是将订阅操作(observer)放到Runnable中执行,并将Runnable放到指定线程里操作;observeOn则是将下游的observer放到指定线程里面去执行。
5、参考资料
https://blog.csdn.net/reakingf/article/details/84845705
What’s different in 3.0