Rxjava线程切换原理源码解析

1,234 阅读8分钟

RRxjava线程切换原理源码解析

前言

rxjava是一款非常流行的响应式异步框架,可以通过一连串的链式调用风骚地玩转于各种线程之间,避免了让人“闻风丧胆”的回调地狱。

那么这么厉害得框架是怎么做到的呢?

其实在很多技术平台上,都有关于rxjava的源码解析,但笔者看了很多,仍然觉得大多数讲得比较晦涩难懂,大多数分析的时候太深入源码,导致只见树木不见森林,看完好像有点懂了,但却又感觉哪里不对,无法对rxjava有一个明晰的认识。因此,笔者打算尝试尽量屏蔽rxjava的实现细节,而是尽力展示rxjava的实现流程,从而好让读者对rxjava的实现原理有个大概认识后,再结合其他技术博客以及rxjava源码,达到理解rxjava线程切换原理的目的。

本文适合于对rxjava使用比较清晰的读者,对于从没使用过的读者,可以先看后面的参考连接1后,再回来阅读。

rxjava的调用流程

首先,先来看一个简单的rxjava的实现:

        Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread());

        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }
            
            @Override
            public void onNext(Integer integer) {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        };

        integerObservable.subscribe(observer);

可以将上面这段代码拆分成几个阶段:

  1. Observable.create()会返回一个ObservableCreate对象
  2. subscribeOn()会返回一个ObservableSubscribeOn对象
  3. observeOn()会返回一个ObservableObserveOn对象
  4. integerObservable.subscribe(observer)

前面过程1~3是生成被观察者的过程,看源码就能知道,这个创建过程,是将ObservableCreate逐层进行包装,然后生成新的Observable对象的过程,并且在包装的时候,会将被包装的observable以成员变量的方式存储起来:

可以看出,最终生成的integerObservable的类型是ObservableObserveOn,并且,其内部可以通过成员变量source获得ObservableSubscribeOn,然后再通过ObservableSubscribeOn的成员变量source获得ObservableCreate,最后就可以通过ObservableCreate的成员变量source获得ObservableOnSubscribe对象,是不是有种套娃的赶脚。

好,通过上面的解释,我想大家已经明白这些关键对象的创建过程以及对象之间的引用关系了吧。然后过程4调用后,rxjava的链式调用汽车就要启动了,大家赶快上车

用一张时序图来给大家看,大家会清晰一些

过程1~2上面已经解释了,是通过包装最终创建了ObservableObserveOn对象,其实这个包装并不是像中秋节的月饼包装那样华而不实,而是在内部给observable添加了线程切换功能,这个线程切换的秘密就在于subscribeActual()方法

接下来,我一步步解释过程3~9

过程3

首先过程3的源码是这样的

TrampolineScheduler是什么请大家看链接2,这里就不细说了,大家就假定,这个方法会走第二个判断语句,即source.subscribe()。大家还记得吗,ObservableObserveOn的成员变量source其实就是ObservableSubscribeOn,那么我们就将observer包装了一层后来到了过程4(注意,subscribe()会调用内部方法subscribeActual(),下面如果不是特别说明,都是如此,就不再赘述了)。

过程4

可以看到,过程4里,会把observer封装成一个runnable,然后通过构建过程中subscribeOn(Schedulers.io())传入的scheduler进行线程切换,至于怎么切换,我们先不说,只要记住这里进行了线程切换就可以了,怎么切换的问题,我们放在后面说,以免为了细节实现而跟丢了主体。

接下来,我们就会在io线程下来执行SubscrbeTask,我们来看看这个task的定义

可以看到,在run()方法里,又调用了source.subscribe()方法,还记得吗?ObservableSubscribeOn的成员变量source就是ObservableCreate,也就是说,我们绕了一大圈,终于又回到了ObservableCreate这个对象里,而且很神奇的发现,这个时候我们已经实现了线程切换!因为,source.subscribe()是执行在io线程里的,有图为证:

过程5~7

大家还记得不,ObservableCreate的成员变量source就是ObservableOnSubscribe,也就是说,source.subscribe()这个方法将会执行到这里:

然后其又在内部调用了e.onNext(),其实就是调用CreateEmitter的onNext()方法,让我们跟进去看看CreateEmitter吧

可以看到内部又调用obsever.onNext()方法,这个observer是什么呢?其实细心的读者应该注意到了,上面的过程中每次调用subscribeActual()方法都会将最初的observer包装一层后,再一直传下去,所以,我们也可以类似于上面observable的包装过程一样,用这张图来描述,observer的结构:

因此,可以看出,CreateEmitter的onNext()方法,其实就是执行SubscribeOnObserver的onNext()方法

过程8~9

可以看到,SubscribeOnObserver的onNext()内部其实就是调用了其成员变量actual的onNext()方法,那么接下来再让我们来看看ObserveOnObserver的onNext()又做了些什么吧

大家看明白了吗?其实就是,ObserveOnObserver是一个实现了runnable接口的类,然后其onNext()方法就是将自己添加到worker中去执行,这个worker自然是通过我们最开始设置的observeOn(AndroidSchedulers.mainThread()),也就是主线程了!也就是说,最终drainNormal()会在主线程中调用,并且其内部会调用actual.onNext()方法,这个actual就是最里层的observer

至此,rxjava的线程切换终于讲完了,大家看完是不是会相对清晰一些呢?如果还不了解,可以通过断点来调试一遍,就会清晰很多。

接下来我们再讲一下上面没有说的,线程切换的具体流程

线程切换的细节

我们以上述例子的子线程,也就是IoScheduler为例,讲一下线程切换的细节,切换到主线程的实现原理其实是相似的,并且rxjava是通过handler来切换到主线程的,具体细节读者可以自行去查阅。

好,让我们把时间倒流到过程4

这一步里会调用scheduler.scheduleDirect()方法,内部会调用

createWorker()是一个抽象方法,在我们这个例子里,将会调用IoScheduler.createWorker(),然后返回一个EventLoopWorker对象,这个对象内部维护着一个核心数为1的线程池,最终通过worker.schedule()内部调用线程去执行这个任务,从而做到线程切换。这个线程池创建的过程比较深,涉及多个类,我就不一一贴出来了,大家可以从下图的地方跟进去查看:

额外的知识点

IoSchedule内部维护着一个CacheWorkPool对象,用来缓存worker对象,避免线程池的重复创建,当链式调用走到最后,也就是observer.onComplete()时,会调用IoSchedule.release(),将worker放入缓存list中,然后cacheWorkPool内部有一个线程,每隔60s执行一次run方法,用来检查缓存的worker是否已过期,如果过期的话,就从list里移除,从而实现了线程池的复用和释放。

问题时间
  1. 多次调用subscribeOn(),并且设置不同的Schedules,会有什么效果,subscribe()执行在什么线程?

    类似于下图:

答案是:

每次调用subscribeOn()都会生成一个ObservableSubscribeOn对象,然后会下往上的顺序调用ObservableSubscribeOn.subscribeActual(),直到进行到io线程时,因为包裹的observable对象是observableCreate,然后接下来的流程就是上面提道的流程5~9的执行过程了。也就是说,执行多次subscribeOn(),虽然会执行多次线程切换,但最终subscribe()执行在第一个subscribeOn()指定的线程里,在这个例子就是io线程。

  1. 如果在上面基础上插入flatMap()操作符,又将如何呢?

答案是,仍然是在io线程中执行flatMap()

这是为什么呢?让我们继续深入到源码吧~

首先调用了flatMap()方法后,内部通过一系列重载,最终会生成ObservableFlatMap对象

可以看出,其实创建过程是类似的,也是将上一个Observable包裹一层而已,然后让我们来看看ObservableFlatMap的subscribeActual()及onNext()方法

可以看出subscribeActual()没有执行转换的动作,真正做转换的地方是在onNext()方法里,然后从问题1的结论我们可以知道onNext()方法是运行在io线程的,因此,flatMap()操作符仍然是在onNext()线程执行的。

小练习

下面代码中flatMap()及subscribe()各自是在哪个线程中执行?

Observable.just(1)
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .subscribeOn(Schedulers.io())
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        Log.d(TAG, "flatMapThread = " + Thread.currentThread().getName());
                        return Observable.just(integer);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "subscribeThread = " + Thread.currentThread().getName());
                    }
                });

大家可以自己想一下,然后再跑起来看看结果与自己想的对不对~

码字不易,如果各位读者老爷看完后觉得对您有帮助,那就给我一点鼓励吧~


参考链接:

  1. 给 Android 开发者的 RxJava详解
  2. 详解 RxJava 的消息订阅和线程切换原理