前言
Rxjava 2.x 源码系列 - 变换操作符 Map(上)
在上一篇博客 Rxjava 2.x 源码系列 - 变换操作符 Map(上) 我们讲解到,Observable#subscribeOn 是如何控制上游 Observable 的执行线程的,他的实质是将 Observable#subscribe(Observer) 的操作放在了指定线程,当我们调用 subcribe 的时候,它的过程是从下往上的,即下面的 Observable 调用上面的 Observable。用下面的流程图表示如下。
接下来,我们先来回顾一下,Observable 与 Observer 之间是如何订阅的
简单来说就是,当我们调用 Observable 的 subsribe 方法的时候,会调用当前对应 observbale 的 subscribeActual 方法,在该方法里面,会调用 observer 的 onSubeciber 方法,并调用对应 ObservableOnSubscirbe 的 subcribe 的方法,并将 ObservableEmitter 作为方法参数暴露出去。而 ObservableEmitter 持有我们的 Observer 的引用,当我们调用 ObservableEmitter 的 onNext,onErrot,onComplete 方法的时候,会调用他持有的 Observer 的相应的方法。
这篇博客主要讲解以下问题:
-
observeOn 是如何控制 Observer 的回调线程的
Observable#observeOn 方法
1@CheckReturnValue2 @SchedulerSupport(SchedulerSupport.CUSTOM)3 public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {45 ObjectHelper.requireNonNull(scheduler, "scheduler is null");6 ObjectHelper.verifyPositive(bufferSize, "bufferSize");7 return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));8 }
observeOn 的套路跟 Observable.create 方法的套路基本一样,都是先判断是否为空,不为 null,用一个新的类包装起来,并持有上游的引用 source。这里我们的包装类是 ObservableObserveOn。
这里我们来看一下 ObservableObserveOn
1public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { 2 final Scheduler scheduler; 3 final boolean delayError; 4 final int bufferSize; 5 public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { 6 super(source); 7 this.scheduler = scheduler; 8 this.delayError = delayError; 9 this.bufferSize = bufferSize;10 }1112 @Override13 protected void subscribeActual(Observer<? super T> observer) {14 // 如果是当前线程,直接低啊用15 if (scheduler instanceof TrampolineScheduler) {16 source.subscribe(observer);17 } else {18 // 否则,通过 worker 的形式调用19 Scheduler.Worker w = scheduler.createWorker();2021 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));22 }23 }24}
从第一篇博客 Rxjava 2.x 源码系列 - 基础框架分析,我们知道,当我们调用 Observable.subscibe(observer) 方法的时候,会调用到 对应的 Observable 实例的 subscribeActual 方法,而这里我们的 Observable 为ObservableObserveOn 。
在 ObservableObserveOn.subscribeActual 方法中,首先会判断 scheduler instanceof TrampolineScheduler (是否是当前线程),true 的话,会直接调用 source.subscribe(observer)。否则,先用 ObserveOnObserver 包装 observer,再调用 source.subscribe 方法
接下来,我们一起来看一下 ObserveOnObserver 类
1static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> 2 implements Observer<T>, Runnable { 3 4 final Observer<? super T> actual; 5 6 7 } 8 9public abstract class BasicIntQueueDisposable<T>10extends AtomicInteger11implements QueueDisposable<T> {
ObserveOnObserver 继承于 BasicIntQueueDisposable,实现 Observer, Runnable 接口,而 BasicIntQueueDisposable extends AtomicInteger ,是原子操作类。
1其中,还有一个很重要的属性 actual ,即是实际的 observer。
接下来,我们来看一下几个重要的方法:
onNext,onError,onComplete,onSubscribition
1public void onSubscribe(Disposable s) { 2 if (DisposableHelper.validate(this.s, s)) { 3 4 ------- 5 6 if (m == QueueDisposable.SYNC) { 7 sourceMode = m; 8 queue = qd; 9 done = true;10 actual.onSubscribe(this);11 schedule();12 return;13 }14 if (m == QueueDisposable.ASYNC) {15 sourceMode = m;16 queue = qd;17 actual.onSubscribe(this);18 return;19 }20 }2122 queue = new SpscLinkedArrayQueue<T>(bufferSize);2324 actual.onSubscribe(this);25 }26}2728@Override29public void onNext(T t) {30 if (done) {31 return;32 }3334 ------35 schedule();36}3738@Override39public void onError(Throwable t) {40 if (done) {41 RxJavaPlugins.onError(t);42 return;43 }44 ----4546 schedule();47}4849@Override50public void onComplete() {51 if (done) {52 return;53 }54 ----5556 schedule();57}
在 onNext,onError,onComplete 方法中首先都会先判断是否 done,如果没有的话,会调用 schedule() 方法。
1void schedule() {2 if (getAndIncrement() == 0) {3 worker.schedule(this);4 }5}
而在 schedule() 方法中,直接调用 Worker 的 schedule 方法,这样就会执行我们当前 ObserveOnObserver 的 run 方法,
1public void run() {2 if (outputFused) {3 drainFused();4 } else {5 drainNormal();6 }7}
在 drainFused 和 drainNormal 方法中,会根据状态去调用 actual(外部传入的 observer) 的 onNext、onError、onComplete 方法。因此 observer 的回调所在的线程将取决于外部传入的 scheduler 的 schedule 方法所在的线程。
假设我们传入的是 observeOn(AndroidSchedulers.mainThread())
1public final class AndroidSchedulers { 2 3 private static final class MainHolder { 4 5 static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); 6 } 7 8 private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( 9 new Callable<Scheduler>() {10 @Override public Scheduler call() throws Exception {11 return MainHolder.DEFAULT;12 }13 });1415 /** A {@link Scheduler} which executes actions on the Android main thread. */16 public static Scheduler mainThread() {17 return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);18 }1920 ---21}
1private static final class HandlerWorker extends Worker { 2 private final Handler handler; 3 4 private volatile boolean disposed; 5 6 HandlerWorker(Handler handler) { 7 this.handler = handler; 8 } 910 @Override11 public Disposable schedule(Runnable run, long delay, TimeUnit unit) {12 if (run == null) throw new NullPointerException("run == null");13 if (unit == null) throw new NullPointerException("unit == null");1415 if (disposed) {16 return Disposables.disposed();17 }1819 run = RxJavaPlugins.onSchedule(run);2021 ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);2223 Message message = Message.obtain(handler, scheduled);24 message.obj = this; // Used as token for batch disposal of this worker's runnables.2526 handler.sendMessageDelayed(message, unit.toMillis(delay));2728 // Re-check disposed state for removing in case we were racing a call to dispose().29 if (disposed) {30 handler.removeCallbacks(scheduled);31 return Disposables.disposed();32 }3334 return scheduled;35 }3637}
从上面的分析我们知道 observer 的回调所在的线程将取决于外部传入的 scheduler 的 schedule 方法所在的线程。即 指定 observeOn(AndroidSchedulers.mainThread()) 之后,将取决于 HandlerWorker 的 schedule 方法执行的线程,在该方法中,很明显执行于主线程。
总结
控制 Observer 的回调实际是放到 ObservableObserveOn 的 run 方法中,即 ObservableObserveOn 的 run 执行在主线程, Observer 的回调也发生在主线程,而 ObservableObserveOn 的 run 执行在哪个线程,取决于 外部传入的 scheduler。因此, 当外部传入的 scheduler 的 schedule 方法在主线程,那么 observer 也在主线程回调。
推荐阅读
Rxjava 2.x 源码系列 - 变换操作符 Map(上)
RecyclerView addItemDecoration 的妙用 - item 间距平均分布和添加分割线
扫一扫,欢迎关注我的公众号。如果你有好的文章,也欢迎你的投稿。