浅谈 RxJava 中的线程管理

2,711 阅读8分钟
原文链接: zjutkz.net

上一篇文章研究了一下RxJava中的多线程并发问题,并且在实际项目中运用了其中的知识,这次让我们来谈一谈另外一个牵扯到线程的问题——那就是RxJava中的线程管理。

之所以写这篇文章,还是因为在项目中使用RxJava的时候遇到了一些线程管理上的问题,经过网上查阅资料并且翻阅源码之后才把问题解决,于是就决定把所感所得记录下来,希望大家看完之后会有收获吧。

前言

首先要说明一点的是,这个不是什么RxJava入门指南,所以不会有很基础的引导代码,如果大家想了解本文中设计的线程切换和管理和操作符转换,请移步到给 Android 开发者的 RxJava 详解这篇文章。

本文将会围绕两个具体的函数:SubscribOn和ObserveOn来进行展开。

SubscribOn

让我们直奔主题,来看看SubscribOn的源码长什么样子:

public final Observable<T> subscribeOn(Scheduler scheduler) {
            if (this instanceof ScalarSynchronousObservable) {
                return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
            }
            return create(new OperatorSubscribeOn<T>(this, scheduler));
        }
        

上面的if判断我们暂且不管,直接看最后的return,可以看到,其实SubscribOn是重新create出了一个Observable,并且传入了自定义的OperatorSubscribeOn作为Observable中的成员变量onSubscribe。所以具体的实现逻辑就是在onSubscribe中了。

@Override
        public void call(final Subscriber<? super T> subscriber) {
            final Worker inner = scheduler.createWorker();
            subscriber.add(inner);
            inner.schedule(new Action0() {
                @Override
                public void call() {
                    final Thread t = Thread.currentThread();
                    Subscriber<T> s = new Subscriber<T>(subscriber) {
                        @Override
                        public void onNext(T t) {
                            subscriber.onNext(t);
                        }
                        @Override
                        public void onError(Throwable e) {
                            try {
                                subscriber.onError(e);
                            } finally {
                                inner.unsubscribe();
                            }
                        }
                        @Override
                        public void onCompleted() {
                            try {
                                subscriber.onCompleted();
                            } finally {
                                inner.unsubscribe();
                            }
                        }
                        @Override
                        public void setProducer(final Producer p) {
                            subscriber.setProducer(new Producer() {
                                @Override
                                public void request(final long n) {
                                    if (t == Thread.currentThread()) {
                                        p.request(n);
                                    } else {
                                        inner.schedule(new Action0() {
                                            @Override
                                            public void call() {
                                                p.request(n);
                                            }
                                        });
                                    }
                                }
                            });
                        }
                    };
                    source.unsafeSubscribe(s);
                }
            });
        }
        

上面是OperatorSubscribeOn中的核心函数call()方法的源码,其中的逻辑还是挺简单的。

首先,通过scheduler创建出一个worker,这个scheduler就是我们在调用subscribOn的时候传入的,基本上我们用到的就是Schedulers.io(),Schedulers.newThread()和Schedulers.computation()三种,分别对应io型,新建型和计算密集型三种类型。

之后的逻辑都是在worker中执行的,相当就是在不同的线程环境中执行了。

public NewThreadWorker(ThreadFactory threadFactory) {
            ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
            // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
            boolean cancelSupported = tryEnableCancelPolicy(exec);
            if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
                registerExecutor((ScheduledThreadPoolExecutor)exec);
            }
            schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
            executor = exec;
        }
        

我们以NewThreadWorker为例,其中就是通过Executors创建出一个线程池的。

然后让我们来看一下call函数中最核心的一句代码:

source.unsafeSubscribe(s);
        

source表示的是在调用subscribeOn之前的Observable链,在unsafeSubscribe方法中:

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
            try {
                // new Subscriber so onStart it
                subscriber.onStart();
                // allow the hook to intercept and/or decorate
                hook.onSubscribeStart(this, onSubscribe).call(subscriber);
                return hook.onSubscribeReturn(subscriber);
            } catch (Throwable e) {
                // special handling for certain Throwable/Error/Exception types
                Exceptions.throwIfFatal(e);
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(hook.onSubscribeError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    hook.onSubscribeError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r;
                }
                return Subscriptions.unsubscribed();
            }
        }
        

其实就是去执行了Observable成员变量onSubscribe的call方法。

下面让我们以一个例子来辅助说明。

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(createInt());
            }
            })
                .subscribeOn(Schedulers.io())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer i) {
                        Log.d("TAG", "onNext: " + Thread.currentThread());
                    }
                    });
        private int createInt() {
                Log.d("TAG", "createInt: " + Thread.currentThread());
                return 1;
            }
        

需要注意的是,例子中为了方便,我直接使用了Observable.create方法。上之前的文章中我也说过,这是一个不好的习惯,请大家不要模仿,尽可能的不要自己使用Observable.create方法。

上面的代码,log输出的线程都是RxJava的io线程。

我们做一个总结,调用了subscribeOn之后会返回一个新的Observable,我们称之为newObservable,newObservable会有一个OperatorSubscribeOn的成员变量,我们称之为newSubscribeOn。例子中调用链的最后一个方法subscribe就是用的newObservable,从而会走到newSubscribeOn的call方法。而newSubscribeOn中通过source保存了subscribeOn之前的Observable链,我们称之为oldObservable,于是在newSubscribeOn中通过worker,将oldObservable放置到对应的线程中执行(调用unsafeSubscribe方法),从而调用oldObservable中的成员变量onSubscribe,我们称之为oldSubscribeOn的call方法。

也许文字说明太过繁琐,让我们用一张图来展示subscribeOn的原理:

subscribeOn

observeOn

说完了subscribeOn,下面让我们来说一说observeOn。如果你认为两者的差距不大,仅仅是字面上的差异,那你就大错特错了,它们两个操作符有着本质上的区别。

public final Observable<T> observeOn(Scheduler scheduler) {
            return observeOn(scheduler, RxRingBuffer.SIZE);
        }
        public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
            return observeOn(scheduler, false, bufferSize);
        }
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            if (this instanceof ScalarSynchronousObservable) {
                return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
            }
            return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
        }
        

上面是observeOn的源码,可以看到最终调用了lift。看到这里就知道为什么subscribeOn和observeOn有着本质上的差别了:subscribeOn是通过创建新的Observable来做到切换线程的,而observeOn则是通过operator,操作符的进行来实现的。

@Override
        public Subscriber<? super T> call(Subscriber<? super T> child) {
            if (scheduler instanceof ImmediateScheduler) {
                // avoid overhead, execute directly
                return child;
            } else if (scheduler instanceof TrampolineScheduler) {
                // avoid overhead, execute directly
                return child;
            } else {
                ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
                parent.init();
                return parent;
            }
        }
        

上面是OperatorObserveOn的核心函数call()方法。我们关注最后的else分支,和一般的operator一样,OperatorObserveOn通过创建新的subscriber来包裹旧的subscriber。

@Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }
        
// only execute this from schedule()
        @Override
        public void call() {
            long missed = 1L;
            long currentEmission = emitted;
            // these are accessed in a tight loop around atomics so
            // loading them into local variables avoids the mandatory re-reading
            // of the constant fields
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;
            final NotificationLite<T> localOn = this.on;
            // requested and counter are not included to avoid JIT issues with register spilling
            // and their access is is amortized because they are part of the outer loop which runs
            // less frequently (usually after each bufferSize elements)
            for (;;) {
                long requestAmount = requested.get();
                while (requestAmount != currentEmission) {
                    boolean done = finished;
                    Object v = q.poll();
                    boolean empty = v == null;
                    if (checkTerminated(done, empty, localChild, q)) {
                        return;
                    }
                    if (empty) {
                        break;
                    }
                    localChild.onNext(localOn.getValue(v));
                    currentEmission++;
                    if (currentEmission == limit) {
                        requestAmount = BackpressureUtils.produced(requested, currentEmission);
                        request(currentEmission);
                        currentEmission = 0L;
                    }
                }
                if (requestAmount == currentEmission) {
                    if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                        return;
                    }
                }
                emitted = currentEmission;
                missed = counter.addAndGet(-missed);
                if (missed == 0L) {
                    break;
                }
            }
        }
        

上面是ObserveOnSubscriber的核心函数onNext()和call()方法。首先判断生命周期和背压,一切正常的情况下调用了schedule()方法,之后再通过call()方法调用被其包裹的子subscriber,并辅以scheduler的线程操作。

下面还是用图的方式来理解一下observeOn的过程:

observeOn

对比

根据上面的源码加图的分析我们可以知道,subscribeOn是通过新建Observable的方式,使用OnSubscribe类的方式去做到线程切换的,相当于是层层包裹,层层回调,所以当你在一个调用链中调用多次subscribeOn是无效的,因为层层回调之后,只有最终的那一个(也就是第一个subscribeOn)才会生效:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(createInt());
            }
            })
                .subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.computation())
                .subscribeOn(Schedulers.newThread())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer i) {
                        Log.d("TAG", "onNext: " + Thread.currentThread());
                    }
                });
                

上面的这个调用链,函数的执行都是在io线程中发生的,也就是只有第一个subscribeOn生效。

这里还要一点需要注意,那就是RxJava中的线程切换都是手动切换的的,不存在自动一说,所以当然调用了subscribeOn或者observeOn之后,它的影响是整条链路,如果你之后不再进行手动切换,线程环境是不会改变的。

下面我们再说observeOn。observeOn是通过operator操作符的形式去完成线程切换的,所以它的作用域和其他操作符一样,是调用observeOn之后的链路,并且由于是通过操作符的形式,所以observeOn是可以多次调用的:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(createInt());
            }
            })
                .observeOn(Schedulers.io())
                .observeOn(Schedulers.computation())
                .observeOn(Schedulers.newThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer i) {
                        Log.d("TAG", "onNext: " + Thread.currentThread());
                    }
                });
                

上面的代码,Action1中的call方法是在新创建出的线程中执行的,这证实了上面说的那一点:observeOn是可以多次调用的。但是createInt函数确实在主线程中执行的,那是因为observeOn本质上是一个operator,它没有能力去影响它上游的链路。

结合使用场景

最后,我们来看一下这两个操作符结合使用的情况:

private int createInt() {
            Log.d("TAG", "createInt: " + Thread.currentThread());
            return 1;
        }
        
Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(createInt());
            }
            })
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer) {
                        Log.d("TAG", "map: " + Thread.currentThread());
                        return integer + 1;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer i) {
                        Log.d("TAG", "onNext: " + Thread.currentThread());
                    }
                });
                

首先是最简单的一个例子,上面的代码中,createInt和map操作符的call是在io线程中执行的,而subscribe的Action1的call是在主线程中执行的。

结合我们上面一个单元所说的内容,subscribeOn影响的是全链路,所以createInt和map操作符的call被其影响,在io线程中执行,而执行完了map操作符之后,调用了observeOn,由于observeOn的本质是操作符,它会影响下游的链路,所以Action1的call会在主线程中执行。

上面的这个例子我们可以总结出以下几点:

1.subscribeOn影响是整条RxJava链路
        2.observeOn只影响它下游的链路
        3.连续调用subscribeOn是没有用的,只有第一次会生效
        4.连续调用observeOn是有用的,会不断的切换线程。
        5.在observeOn调用之后,subscribeOn的影响就会被observeOn抵消。换句话说,observeOn就是subscribeOn的"拦路虎"