友好 RxJava2.x 源码解析(二)线程切换

4,562 阅读14分钟

系列文章:

本文 csdn 地址:友好 RxJava2.x 源码解析(二)线程切换

本文基于 RxJava 2.1.3

前言

本文基于读者会使用 RxJava 2.x 而讲解,基本原理不涉及,示例只纯粹为示例而示例。

示例代码

示例源码:
Observable
        .create(new ObservableOnSubscribe<String>() {
            @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    Log.e("TAG", "subscribe(): 所在线程为 " + Thread.currentThread().getName());
                    emitter.onNext("1");
                    emitter.onComplete();
                }
            })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("TAG", "onSubscribe(): 所在线程为 " + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                Log.e("TAG", "onNext(): 所在线程为 " + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
                Log.e("TAG", "onComplete(): 所在线程为 " + Thread.currentThread().getName());
            }
        });

输出结果:

E/TAG: onSubscribe(): 所在线程为 main
E/TAG: subscribe(): 所在线程为 RxCachedThreadScheduler-1
E/TAG: onNext(): 所在线程为 main
E/TAG: onComplete(): 所在线程为 main

源码解析

我们可以发现,除了 Observable 的 subscribe(ObservableEmitter) 方法执行在 io 线程,Observer 的方法都是执行在 main 线程的,接下来就请各位读者跟着笔者来分析了。

Observer#onSubscribe(Dispose)

看到标题部分读者就疑惑了,明明是说线程切换,跟 Observer#onSubscribe() 方法有什么关系呢?前方的 log 中展示 Observer#onSubscribe() 方法在主线程执行的,但是这个主线程是由 .observeOn(AndroidSchedulers.mainThread()) 所导致的吗?为了解决这个疑惑,我们可以在外面套一个子线程,然后去执行该逻辑,代码如下:

new Thread() {
    @Override
    public void run() {
	    Log.e("TAG", "run: 所在线程为 " + Thread.currentThread().getName());
        // 添加示例代码
    }
}.start();

打印结果:

run: 所在线程为 Thread-554
onSubscribe(): 所在线程为 Thread-554
subscribe(): 所在线程为 RxCachedThreadScheduler-1
onNext(): 所在线程为 main
onComplete(): 所在线程为 main

所以实际上 Observer#onSubscribe() 的执行线程是当前线程,它并不受 subscribe(Scheduler)observeOn(Scheduler) 所影响(因为笔者这段代码写在了 Android 主线程当中,所以当前线程是主线程)。本文不在此扩展原因,具体源码追溯和查看前一篇文章,简而言之—— subscribe(Observer) -> subscribeActual(Observer) -> Observer#onSubscribe(),我们可以看到 subscribe(Observer) 的执行线程是当前线程,而在上面所述的数据流中也不存在数据切换的过程,所以 onSubscribe() 执行的线程也是当前线程。

Observable#observeOn(Scheduler)

此小节针对 Observable#observeOn(Scheduler) 讲解,所以将示例代码更改如下:

new Thread() {
    @Override
    public void run() {
        Log.e("TAG", "run: 当前默认执行环境为 " + Thread.currentThread().getName());
        Observable
            .create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("1");
                }
            })
            // 仅保留 observeOn(Scheduler)
            .observeOn(Schedulers.io())
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(String s) {
                    Log.e("TAG", "onNext(): 所在线程为 " + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                }
            });
    }
} .start();

输出结果:

E/TAG: run: 当前默认执行线程为 Thread-610
E/TAG: onNext(): 所在线程为 RxCachedThreadScheduler-1

不作用上游 Observable

同样的,直接先进入 Observable#observeOn(Scheduler) 源码查看一下,发现其最终会调用 Observable#的observeOn(Scheduler, boolean, int) 方法,该方法将会返回一个 Observable 对象。那么老问题来了,是哪个 Observable 对象调用的 observeOn() 方法,又返回了一个怎样的 Observable 对象?

第一个问题很简单,是 Observable.create(ObservableOnSubscribe) 对象返回的一个 Observable,而且这个 Observable 是一个 ObservableCreate 对象(这里不理解的可以查看第一篇文章)。但是 Observable#observeOn(Scheduler, boolean, int) 是没有被任何子类重写的,这意味着它的子类都是调用它的该方法。

第二个问题来了,返回了一个怎样的 Observable 对象呢?实际上这里的分析流程和第一篇文章中所阐述的流程是一模一样的,我们戳进 Observable#observeOn(Scheduler, boolean, int) 源码,发现它最终会返回一个 new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize) 对象,这里我们只关注前两个对象,第一个参数 this 是指上游的 Observable 对象,也就是我们第一个问题中所涉及到的 Observable 对象,第二个参数 scheduler 毋庸置疑就是我们所传入的 Scheduler 对象了,在此也就是我们的 AndroidSchedulers.mainThread()

通过第一篇的学习,我们应该会轻车熟路地打开 ObservableObserveOn 类并查看它的核心 subscribeActual() 方法以及构造函数——

final Scheduler scheduler;
final boolean delayError;
final int bufferSize;

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
    // 如果传入的 scheduler 是 Scheduler.trampoline() 的情况
    // 该线程的意义是传入当前线程,也就是不做任何线程切换操作
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

直接进入第二个 case,首先先略去第19行代码,看到第20行代码,source(上游 Observable) 和 Observable#subscribe() 操作都没有任何变化,唯一改变的地方就是将 Observer 进行了封装,所以我们可以因此得出结论, Observable#observeOn(Scheduler) 并不会对上游线程执行环境有任何影响。(如果看到这里不能够理解的话,后文中会有通俗易懂的伪代码辅助理解)

作用下游 Observer

经过上文友好 RxJava2.x 源码解析(一)基本订阅流程一文的分析我们知道 ObservableEmitter 的 onNext(T) 方法会触发「下游」 Observer 的 onNext(T) 方法,而此时的「下游」 Observer 对象是经过 Observable#observeOn(Scheduler) 封装的 ObserveOnObserver 对象,所以我们不妨打开 ObserveOnObserver 的 onNext(T) 方法——

@Override
public void onNext(T t) {
    // 删除无关源码
    queue.offer(t);
    schedule();
}

可以看到 onNext(T) 方法做了两件事——一是将当前方法传入的对象添加进队列;另一是执行 schedule() 方法,打开 schedule() 方法源码——

void schedule() {
    // 删除无关源码
    worker.schedule(this);
}

所以将会执行 worker.schedule(Runnable),可向下继续追溯到 schedule(Runnable, long, TimeUnit ) ,该方法是一个抽象方法,所以我们可以想到,调度器们就是通过实现该方法来创建各色各样的线程的。所以我们继续追溯到 IoScheduler 的 schedule(Runnable, long, TimeUnit) 中,源码如下:

    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        // 删除无关源码
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }

继续追溯下去——

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Future<?> f;
    if (delayTime <= 0) {
        f = executor.submit((Callable<Object>)sr);
    } else {
        f = executor.schedule((Callable<Object>)sr, delayTime, unit);
    }
    sr.setFuture(f);

    return sr;
}

executor 是一个 ScheduledExecutorService 对象,而 ScheduledExecutorService 的父接口是我们所熟悉的 ExecutorService 接口,所以很清晰 ScheduledExecutorService 具有创建和调度线程的能力,而其具体的实现在此就不讨论了。

最后,我们不妨将上述所提到的几段源代码整体抽象结合一下:

@Override
public void onNext(T t) {
    // 删除无关源码
    if (delayTime <= 0) {
        f = executor.submit((Callable<Object>)this);
    } else {
        f = executor.schedule((Callable<Object>)this, delayTime, unit);
    }
}

总结一下:onNext(T) 方法会触发 Scheduler 对象的 schedule(Runnable, long, TimeUnit) ,该方法是一个抽象方法,由子类实现,所以才有了多元多样的 Schedulers.io()/Schedulers.computation()/Schedulers.trampoline() 等调度器,具体调度器的内部会使用相关的线程来 submit() 或者 schedule() 任务。解决完调度器的问题,那么接下来就是看看 Runnable#run() 里面的逻辑是什么样的,回到 ObserveOnObserver 中——

@Override
public void run() {
    drainNormal();
}

drainNormal() 源码如下:

void drainNormal() {
    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;

    for (;;) {
        T v;

        try {
            v = q.poll();
        } catch (Throwable ex) {
        }
        boolean empty = v == null;

        if (empty) {
            break;
        }

        a.onNext(v);
    }
}

可以看到实际上最后一行执行了 Observer#onNext(T) 方法,也就是意味着「ObserveOnObserver 中触发下一层 Observer 的 onNext(T) 操作」在指定线程执行,也就达到了切换线程的目的了。

来个复杂的例子——

示例图

经过友好 RxJava2.x 源码解析(一)基本订阅流程一文我们知道,Observer 的传递是由下往上的,从源头开始,我们自定义的 Observer 向上传递的时候到达第六个 Observable 的时候被线程封装了一层,我们不妨使用伪代码演示一下——

public class Observer {
    Observer oldObserver;

    public Observer(Observer observer) {
        oldObserver = observer;
    }

    public void onNext(T t) {
        // 一些其他操作
        new Thread("Android mainThread") {
            @Override
            public void run() {
                oldObserver.onNext(t);
            }
        } .start();
    }

    public void onError(Throwable e) {
        // 一些其他操作
        new Thread("Android mainThread") {
            @Override
            public void run() {
                oldObserver.onError(e);
            }
        } .start();
    }

    public void onComplete() {
        // 一些其他操作
        new Thread("Android mainThread") {
            @Override
            public void run() {
                oldObserver.onComplete();
            }
        } .start();
    }
}

Observer 继续向上被传递,Observable#map() 中并未对 Observer 进行线程切换;再向上走,到达第四个 observeOn(Scheduler) 的时候,被 computation 线程嵌套了一层——

public class Observer {
    Observer oldObserver;

    public Observer(Observer observer) {
        oldObserver = observer;
    }

    public void onNext(T t) {
        // 一些其他操作
        new Thread("computation") {
            @Override
            public void run() {
                oldObserver.onNext(t);
            }
        } .start();
    }

    public void onError(Throwable e) {
        // 一些其他操作
        new Thread("computation") {
            @Override
            public void run() {
                oldObserver.onError(e);
            }
        } .start();
    }

    public void onComplete() {
        // 一些其他操作
        new Thread("computation") {
            @Override
            public void run() {
                oldObserver.onComplete();
            }
        } .start();
    }
}

当然,继续向上直到顶端 Observable——

public class Observer {
    Observer oldObserver;

    public Observer(Observer observer) {
        oldObserver = observer;
    }

    public void onNext(T t) {
        // 一些其他操作
        new Thread("io") {
            @Override
            public void run() {
                oldObserver.onNext(t);
            }
        } .start();
    }

    public void onError(Throwable e) {
        // 一些其他操作
        new Thread("io") {
            @Override
            public void run() {
                oldObserver.onError(e);
            }
        } .start();
    }

    public void onComplete() {
        // 一些其他操作
        new Thread("io") {
            @Override
            public void run() {
                oldObserver.onComplete();
            }
        } .start();
    }
}

甚至更精简的操作如下:

new Thread("Scheduler io") {
    @Override
    public void run() {
        // flatMap() 操作
        flatMap();
        System.out.println("flatMap 操作符执行线程:" + Thread.currentThread().getName());
        System.out.println("第二个 observeOn() 执行线程:" + Thread.currentThread().getName());
        // 第二个 observeOn() 操作
        new Thread("Scheduler computation") {
            @Override
            public void run() {
                // map() 操作
                map();
                System.out.println("map 操作符执行线程:" + Thread.currentThread().getName());
                System.out.println("第三个 observeOn() 执行线程:" + Thread.currentThread().getName());
                // 第三个 observeOn() 操作
                new Thread("Android mainThread") {
                    @Override
                    public void run() {
                        // Observer#onNext(T)/onComplete()/onError() 执行线程
                        System.out.println("Observer#onNext(T)/onComplete()/onError() 执行线程:" +
                                           Thread.currentThread().getName());
                    }
                } .start();
            }
        } .start();
    }
} .start();

输出结果:

flatMap 操作符执行线程:Scheduler io
第二个 observeOn() 执行线程:Scheduler io
map 操作符执行线程:Scheduler computation
第三个 observeOn() 执行线程:Scheduler computation
Observer#onNext(T)/onComplete()/onError() 执行线程:Android mainThread

由此便将 Observable#observeOn(Scheduler) 是如何将下游 Observer 置于指定线程执行的流程分析完了。简而言之 Observable#observeOn(Scheduler) 的实现原理在于将目标 Observer 的 onNext(T)/onError(Throwable)/onComplete() 置于指定线程中运行

这里特别要注意的一点是——【线程操作符切换的是其他的流,自身这条流是不会受到影响的。】看过知乎前一段时间的 rx 分享视频的小伙伴应该有注意到杨凡前辈的 PPT 中有这么一图:

这里写图片描述
想要提出两点——observeOn(Schedulers.io()) 所对应的 Observable 应该是受到了 subscribeOn(AndroidSchedulers.mainThread()) 影响,所以它创建的这条流应该执行于主线程;而 subscribeOn(AndroidSchedulers.mainThread()) 所对应的 Observable 则受到了 subscribeOn(Schedulers.computation) 影响,所以它创建的这条流应该执行于 computation 线程。

Observable#subscribeOn(Scheduler)

切换 subscribe 线程

示例代码:
Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("1");
            Log.e("TAG", "被观察者所在的线程 " + Thread.currentThread().getName());
        }
    })
    .subscribeOn(Schedulers.io())
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e("TAG", "onSubscribe: " + Thread.currentThread().getName());
        }

        @Override
        public void onNext(String s) {
            Log.e("TAG", "观察者所在线程为 " + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });

输出结果:

E/TAG: onSubscribe: main
E/TAG: 观察者所在线程为 RxCachedThreadScheduler-1
E/TAG: 被观察者所在的线程 RxCachedThreadScheduler-1

同样地,戳进 Observable#subscirbeOn(Scheduler) 源码,点进 ObservableSubscribeOn 查看 subscribeActual(Observer) 的具体实现,相信这对于各位读者来说已经轻车熟路了——

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    Disposeable disposable = scheduler.scheduleDirect(new SubscribeTask(parent));
    parent.setDisposable(disposable);
}

第一行老套路,对下游 Observer 进行了一层封装;第二行因为它不涉及线程切换所以此处也不做扩展;第三行就是我们的关键了 Scheduler#scheduleDirect(Runnable) 方法可以追溯到 Scheduler#schedule(Runnable, long, TimeUnit),这部分在前面已经阐述过了,就不做扩展了。SubscribeTask 是一个 Runnable,它的 run() 核心方法——

@Override
public void run() {
	source.subscribe(parent);
}

至此谜团解开了,Observable#subscribeOn(Scheduler)Observable#subscribe(Observer) 的执行过程移到了指定线程(在上述中也就是 io 线程),同时 Observable 和 Observer 中并未做新的线程切换处理,所以它们的订阅、发射等操作就执行在了 io 线程。

第一次有效原理

示例代码:
Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("1");
            Log.e("TAG", "被观察者所在的线程 " + Thread.currentThread().getName());
        }
    })
    .subscribeOn(Schedulers.io())
    .subscribeOn(Schedulers.computation())
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e("TAG", "onSubscribe: " + Thread.currentThread().getName());
        }

        @Override
        public void onNext(String s) {
            Log.e("TAG", "观察者所在线程为 " + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });

打印结果:

onSubscribe: main
观察者所在线程为 RxCachedThreadScheduler-1
被观察者所在的线程 RxCachedThreadScheduler-1

我们知道,只有第一个 Observable#subscribeOn(Scheduler) 操作才有用,而后续的 Observable#subscribeOn(Scheduler) 并不会影响整个流程中 Observerable 。同样的,来张图——

示例图

前面我们分析到,Observable#subscribeOn(Scheduler) 实际上是将 Observable#subscribe(Observer) 的操作放在了指定线程,而通过友好 RxJava2.x 源码解析(一)基本订阅流程一文我们知道了 subscribe 的过程是由下往上的。所以首先是第三个 Observable 调用 Observable#subscribe(Observer) 启动订阅,在其内部会激活第二个 Observable 的 Observable#subscribe(Observer) 方法,但是此时该方法外部被套入了一个 Schedulers.computation() 线程,于是这个订阅的过程就被运行在了该线程中。同样的,我们不妨用伪代码演示一下——

public class Observable {
    // 第「二」个 Observable
    Observable source;
    Observer observer;

    public Observable(Observable source, Observer observer) {
        this.source = source;
        this.observer = observer;
    }

    public void subscribe(Observer Observer) {
        new Thread("computation") {
            @Override
            public void run() {
                // 第「二」个 Observable 订阅
                source.subscribe(observer);
            }
        }
    }
}

再往上走,第二个 Observable 订阅内部会激活第一个 Observable 的 Observable#subscribe(Observer) 方法,同样的,该方法被套在了 Schedulers.io() 线程中,如下——

public class Observable {
    // 第「一」个 Observable
    Observable source;
    Observer observer;

    public Observable(Observable source, Observer observer) {
        this.source = source;
        this.observer = observer;
    }

    public void subscribe(Observer Observer) {
        new Thread("io") {
            @Override
            public void run() {
                // 第「一」个 Observable 订阅
                source.subscribe(observer);
            }
        }
    }
}

此时到达第一个 Observable 了之后就要开始发射事件了,此时的执行线程很明显是 io 线程。还可以换成 Thread 伪代码来表示 ——

new Thread("computation") {
    @Override
    public void run() {
        // 第二个 Observable.subscribe(Observer) 的实质
        // 就是切换线程,效果类似如下
        new Thread("io") {
            @Override
            public void run() {
                // 第一个 Observable.subscribe(Observer) 的实质
                // 就是发射事件
                System.out.println("onNext(T)/onError(Throwable)/onComplete() 的执行线程是: " + Thread
                                   .currentThread().getName());
            }
        } .start();
    }
} .start();

输出结果:

onNext(T)/onError(Throwable)/onComplete() 的执行线程是: io

Observable#observeOn(Scheduler) 和 Observable#subscribeOn(Scheduler)

如果针对前面的内容你已经懂了,那么后续的内容可以直接跳过啦,本文就结束了~如果你还没懂,笔者再汇总一次。

经过友好 RxJava2.x 源码解析(一)基本订阅流程一文我们知道,Observable#subscribe(Observer) 的顺序是由下往上的,本游会将 Observer 进行「封装」,然后「激活上游Observable 订阅这个 Observer」

我们不妨抽象一个 Observer,如下:

public class Observer<T> {
    public void onNext(T t){}
    public void onCompelete(){}
    public void onError(Throwable t){}
}

对于 Observable#observeOn(Schedulers.computation()) 操作来说,它对 Observer 进行了怎样的封装呢?

public class NewObserver<T> {
    // 下游 Observer
    Observer downStreamObserver;
    public NewObserver(Observer observer) {
        downStreamObserver = observer;
    }

    public void onNext(T t) {
        new Thread("computation") {
            downStreamObserver.onNext(t);
        }
    }

    public void onError(Throwable e) {
        new Thread("computation") {
            downStreamObserver.onError(e);
        }
    }

    public void onComplete() {
        new Thread("computation") {
            downStreamObserver.onComplete();
        }
    }
}

Observable#observeOn(Scheduler) 内部,其对下游的 Observer 进行了类似如上的封装,这就导致了其「下游」 Observer 在指定线程内执行。所以 Observable#observeOn(Scheduler) 是可以多次调用并有效的。

而对于 Observable#subscribe(Scheduler) 来说,它并未对下游 Observer 进行封装,但是对于「激活上游 Observable 订阅这个 Observer」这个操作它做了一点小小的手脚,也就是切换线程,我们抽象如下——

public class ComputationObservable {
    public void subscribe(observer) {
        new Thread("computation") {
            // upstreamObservable 是上游 Observable,我们不妨假设是下文中所提到的 IOObservable
            upstreamObservable.subscribe(observer);
        }
    }
}

而当它在往上遇到了一个新的 Observable#subscribe(Scheduler) 操作的时候——

public class IOObservable {
    public void subscribe(observer) {
        new Thread("io") {
            // upstreamObservable 是上游 Observable,我们不妨下文中所提到的 TopObservable
            upstreamObservable.subscribe(observer);
        }
    }
}

我们不妨假设此时已经到达了最顶端开始发射事件了——

public class TopObservable {
    public void subscribe(observer) {
        observer.onNext(t);
    }
}

此时的 Observer#onNext(t) 的执行环境当然就是由最后一个 subscribeOn(Scheduler) 操作符(此处的最后一个是指订阅流程中的最后一个,它与实际写代码的顺序相反,也就是我们代码中的第一个 subscribeOn(Scheduler) 操作符)所决定的了,在上述伪代码中也就是 io 线程,伪代码对应的源码如下——

Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("1");
        }
    })
    .subscribeOn(Schedulers.io())
    .subscribeOn(Schedulers.computation())
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onNext(String s) {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });