RxJava2源码分析——线程切换

932 阅读11分钟

本文章主要是对RxJava2线程切换流程进行源码分析,在阅读之前,可以先阅读以下文章:

RxJava2源码分析——订阅

本文章用的RxJavaRxAndroid版本如下:

implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

我们先写段示例代码,代码如下:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan");
    emitter.onNext("Jia");
    emitter.onNext("Jun");
    emitter.onComplete();
  
    Log.i("TanJiaJun", "subscribe方法所在的线程:" + Thread.currentThread().getName());
})
        // 切换上游Observable到io线程
        .subscribeOn(Schedulers.io())
        // 切换下游Observer到主线程,使用AndroidSchedulers.mainThread需要使用RxAndroid这个库
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("TanJiaJun", "onSubscribe方法所在的线程:" + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", "onNext方法所在的线程:" + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                Log.i("TanJiaJun", "onError所在的线程:" + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                Log.i("TanJiaJun", "onComplete方法所在的线程:" + Thread.currentThread().getName());
            }
        });

源码分析

首先我们看下Schedulers这个类。

Schedulers

阅读源码后,我们可以得知,总共有5种类型。

computation

@NonNull
public static Scheduler computation() {
    return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}

该方法返回一个默认、共享的调度器实例用于计算工作,这可以用于事件循环处理回调其他计算工作

io

@NonNull
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

该方法返回一个默认、共享的调度器实例用于IO绑定的工作,这可以用于异步执行阻塞IO,默认是由单线程实例池实现的,可以重用已经启动的线程,要注意的是,这个调度器的线程数量可能会无限制增长,从而导致内存溢出(OOM)

trampoline

@NonNull
public static Scheduler trampoline() {
    return TRAMPOLINE;
}

该方法返回一个默认、共享的调度器实例,用于队列工作,并以FIFO方式在一个参与线程中执行它们,也就是说会等到当前线程执行完毕才会执行下个线程。

newThread

÷@NonNull
public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}

该方法返回一个默认、共享的调度器实例,该实例为每个工作单元创建一个新线程,默认实现是创建一个新的单线程,要注意的是,每次调用Scheduler.scheduleDirect方法(及其重载方法)和Scheduler.createWorker方法都可以创建数目无限制的线程,从而造成内存溢出(OOM)

single

@NonNull
public static Scheduler single() {
    return RxJavaPlugins.onSingleScheduler(SINGLE);
}

该方法返回一个默认、共享的调度器实例,该实例会创建一个单独的线程。

负责线程切换有两个方法:subscribeOnobserveOn

subscribeOn

这个方法负责切换上游Observable的线程,代码如下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

根据上篇文章阅读subscribe方法源码的经验,我们只看ObservableSubscribeOn类就可以了,要注意的点我都写上注释了,代码如下:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        // source是上游Observable
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        // 创建SubscribeOnObserver对象,传入下游Observer
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        // 创建SubscribeTask任务,使用指定的调度器进行调度
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        // 省略部分代码
    }

    // SubscribeTask继承Runnable,所以我们可以看下它的run方法
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // 这里已经切换到想要的线程了,source是上游Observable,调用它的subscribe方法,并且传入下游observer,根据上篇文章的经验,上游Observable的subscribeActual方法会被执行
            source.subscribe(parent);
        }
    }
}

我们的示例代码中调用subscribeOn方法传入的是Schedulers.io(),看下这个方法对应的源码,代码如下:

// Schedulers.java
@NonNull
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

IO是一个final的静态变量,它是通过Schedulers这个类的静态代码块赋值的,代码如下:

static {
    // 省略部分代码

    IO = RxJavaPlugins.initIoScheduler(new IOTask());

    // 省略部分代码
}

它会创建一个IOTask对象,代码如下:

static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}

这个类实现了Callable接口,并且重写了call方法,返回IoHolder.DEFAULT,代码如下:

// DEFAULT是final的静态类IoHolder里的final的静态变量
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

我们看到这里创建了一个IoScheduler对象,代码如下:

// IoScheduler.java
static final RxThreadFactory WORKER_THREAD_FACTORY;

static {
        // 省略部分代码

        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_IO_PRIORITY, Thread.NORM_PRIORITY)));

        // RxThreadFactory是一个线程工厂,可以删除对new Thread调用的硬连接
        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);

        // 省略部分代码
        // 创建CachedWorkerPool对象,第二个参数是传入TimeUnit,如果是null的话,是不会创建线程池的,下面会讲到
        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
        NONE.shutdown();
    }

// IoScheduler的构造方法
public IoScheduler() {
    // 这里会调用下面那个方法
    this(WORKER_THREAD_FACTORY);
}

public IoScheduler(ThreadFactory threadFactory) {
    // 赋值给成员变量threadFactory
    this.threadFactory = threadFactory;
    // 用CachedWorkerPool创建一个原子引用
    this.pool = new AtomicReference<CachedWorkerPool>(NONE);
    // 调用start方法
    start();
}

@Override
public void start() {
    CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
    // compareAndSet方法第一个参数是预期值,第二个参数是新值,如果NONE==update的话,就会将值原子性地设置会更新值,并且返回true,否则不会更新,并且返回false,然后调用shutdown方法
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}

static final class CachedWorkerPool implements Runnable {
    CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
        this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
        this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
        this.allWorkers = new CompositeDisposable();
        this.threadFactory = threadFactory;

        ScheduledExecutorService evictor = null;
        Future<?> task = null;
        if (unit != null) {
            // 当unit不是null的话,就会创建一个newScheduledThreadPool线程池
            evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
            task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
        }
        evictorService = evictor;
        evictorTask = task;
    }
}

我们再回到上面说的ObservableSubscribeOn类,看到如下这段代码:

// ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    // 调用了scheduler的scheduleDirect方法
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

我们再看下scheduleDirect方法,代码如下:

// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    // 调用createWorker方法,createWorker是个抽象方法,刚才我们所说的IoScheduler是Scheduler的实现类,它重写了createWorker方法
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    // DisposeTask实现了Runnable接口
    DisposeTask task = new DisposeTask(decoratedRun, w);

    // 调用worker的scheduler方法
    w.schedule(task, delay, unit);

    return task;
}

我们再看下createWorker方法,代码如下:

// IoScheduler.java
@NonNull
@Override
public Worker createWorker() {
    // 创建EventLooperWork,并且传入从原子引用得到的当前的值
    return new EventLoopWorker(pool.get());
}

EventLoopWorkerIoScheduler的一个final静态内部类,继承Scheduler.Worker,代码如下:

// IoScheduler.java
static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        this.threadWorker = pool.get();
    }

    @Override
    public void dispose() {
        if (once.compareAndSet(false, true)) {
            tasks.dispose();

            // releasing the pool should be the last action
            pool.release(threadWorker);
        }
    }

    @Override
    public boolean isDisposed() {
        return once.get();
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        // 如果已经取消订阅了,就不再调度
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }

        // 调用了ThreadWorker的scheduleActual方法
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
}

// ThreadWorker继承NewThreadWorker
static final class ThreadWorker extends NewThreadWorker {
    private long expirationTime;

    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }

    public long getExpirationTime() {
        return expirationTime;
    }

    public void setExpirationTime(long expirationTime) {
        this.expirationTime = expirationTime;
    }
}

我们看下NewThreadWorkerscheduleActual方法,代码如下:

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    // ScheduledExecutorService是个接口,继承ExecutorService接口
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        // 调用SchedulerPoolFactory的create方法,创建线程池
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    // 省略部分代码

    public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
        ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
        try {
            Future<?> f;
            // executor.submit和excutor.schedule其实最后会调用同一个方法,执行这个方法后任务就提交上去了
            if (delayTime <= 0L) {
                // 如果不需要延迟就调用submit方法,提交一个有返回结果的任务
                f = executor.submit(task);
            } else {
                // 如果需要延迟就调用schedule方法,提交一个有返回结果的任务
                f = executor.schedule(task, delayTime, unit);
            }
            task.setFuture(f);
            return task;
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }
    }

    // 省略部分代码
}

到这里,上游Observable的代码就会被切换到对应的线程了,我们这里是拿**Schedulers.io()**作为例子来讲解,其他类型大家可以自己看下源码。

结论:订阅事件从下往上传递的,最终传递到上游Observablesubscribe方法。

observeOn

这个方法负责切换下游Observer的线程,代码如下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    // 调用下面那个方法
    return observeOn(scheduler, false, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

也像之前那样,我们只需要看ObservableObserveOn这个方法,代码如下:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        // source是上游Observable
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 判断要指定的调度器是不是TrampolineScheduler,也就是是不是传入Schedulers.trampoline()
        if (scheduler instanceof TrampolineScheduler) {
            // 如果是,就直接调用subscribe方法,因为TrampolineScheduler是在当前线程调度的,上面也提及过
            source.subscribe(observer);
        } else {
            // 如果不是,就通过调度器创建worker,然后调用subscribe方法传入创建的ObserveOnObserver对象
            Scheduler.Worker w = scheduler.createWorker();

            // 与subscribeOn不同,subscribe方法不是在已经切换好的线程中执行
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

    // ObserveOnObserver是一个final的静态内部类,实现了Runnable接口,所以我们看下它的run方法
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable s;

        Throwable error;
        volatile boolean done;

        volatile boolean cancelled;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        // 省略部分代码

        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                // 如果checkTerminated方法返回false就会return
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    // 最后调用下游Observer的onNext方法
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        // 省略部分代码

        @Override
        public void run() {
            // 到这里已经切换到想要的线程了,outputFused变量是通过requestFusion设置的
            if (outputFused) {
                drainFused();
            } else {
                // 我们主要看这个方法
                drainNormal();
            }
        }

        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (cancelled) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                // delayError在我们调用的observeOn方法中是传入false的
                if (delayError) {
                    if (empty) {
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    if (e != null) {
                        queue.clear();
                        // 如果Throwable不是null的话,就会调用下游Observer的onError方法
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        // 如果任务队列是空的话,证明任务执行完毕,就会调用下游Observer的onComplete方法
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }

        // 这个方法和背压(Backpressure)有关系,不是本文章的主要内容,暂时不讨论
        @Override
        public int requestFusion(int mode) {
            if ((mode & ASYNC) != 0) {
                outputFused = true;
                return ASYNC;
            }
            return NONE;
        }

        // 省略部分代码
    }
}

结论:观察事件从上往下传递的,最终传递到下游Observer的回调方法,例如:onNext方法、onComplete方法、onError方法,注意onSubscribe方法所在的线程是当前的线程,不会随着订阅线程或者观察线程的切换而改变。

多次调用subscribeOn方法,切换订阅线程

我们试下多次调用subscribeOn方法,把示例代码改成如下:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan");
    emitter.onNext("Jia");
    emitter.onNext("Jun");
    emitter.onComplete();

    Log.i("TanJiaJun", "subscribe方法所在的线程:" + Thread.currentThread().getName());
})
        .subscribeOn(Schedulers.io())
        .subscribeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("TanJiaJun", "onSubscribe方法所在的线程:" + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", "onNext方法所在的线程:" + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                Log.i("TanJiaJun", "onError所在的线程:" + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                Log.i("TanJiaJun", "onComplete方法所在的线程:" + Thread.currentThread().getName());
            }
        });

Log如下:

subscribeOnLog.png

根据之前的源码分析,其实它像如下代码:

new Thread("AndroidSchedulers.mainThread()") {
    @Override
    public void run() {
        new Thread("Schedulers.io()") {
            @Override
            public void run() {
                System.out.println("上游Observable的subscribe方法所在的线程:" + getName());
            }
        }.start();
    }
}.start();

Log如下:

subscribeOnDemoLog.png

结论:如果我们多次调用subscribeOn方法,切换订阅线程的话,上游Observablesubscribe方法所在的线程只会是在第一次切换的线程,上面也提到过了,因为订阅事件从下往上传递的,最终传递到上游Observablesubscribe方法。

多次调用observeOn方法,切换观察线程

我们试下多次调用obsesrveOn方法,把示例代码改成如下:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan");
    emitter.onNext("Jia");
    emitter.onNext("Jun");
    emitter.onComplete();

    Log.i("TanJiaJun", "subscribe方法所在的线程:" + Thread.currentThread().getName());
})
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.io())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("TanJiaJun", "onSubscribe方法所在的线程:" + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", "onNext方法所在的线程:" + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                Log.i("TanJiaJun", "onError所在的线程:" + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                Log.i("TanJiaJun", "onComplete方法所在的线程:" + Thread.currentThread().getName());
            }
        });

Log如下:

observeOnLog.png

根据之前的源码分析,其实它像如下代码:

new Thread("AndroidSchedulers.mainThread()") {
    @Override
    public void run() {
        new Thread("Schedulers.io()") {
            @Override
            public void run() {
                System.out.println("下游Observer的回调方法所在的线程:" + getName());
            }
        }.start();
    }
}.start();

Log如下:

observeOnDemoLog.png

结论:如果我们多次调用observeOn方法,切换观察线程的话,下游Observer的回调方法,例如:onNext方法、onComplete方法、onError方法,它们所在的线程会随着每次切换而切换,因为观察事件从上往下传递的,最终传递到下游Observer的回调方法。

Demo:RxJavaDemo

我的GitHub:TanJiaJunBeyond

Android通用框架:Android通用框架

我的掘金:谭嘉俊

我的简书:谭嘉俊

我的CSDN:谭嘉俊