上一篇文章研究了一下RxJava中的多线程并发问题,并且在实际项目中运用了其中的知识,这次让我们来谈一谈另外一个牵扯到线程的问题——那就是RxJava中的线程管理。
之所以写这篇文章,还是因为在项目中使用RxJava的时候遇到了一些线程管理上的问题,经过网上查阅资料并且翻阅源码之后才把问题解决,于是就决定把所感所得记录下来,希望大家看完之后会有收获吧。
前言
首先要说明一点的是,这个不是什么RxJava入门指南,所以不会有很基础的引导代码,如果大家想了解本文中设计的线程切换和管理和操作符转换,请移步到给 Android 开发者的 RxJava 详解这篇文章。
本文将会围绕两个具体的函数:SubscribOn和ObserveOn来进行展开。
SubscribOn
让我们直奔主题,来看看SubscribOn的源码长什么样子:
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
上面的if判断我们暂且不管,直接看最后的return,可以看到,其实SubscribOn是重新create出了一个Observable,并且传入了自定义的OperatorSubscribeOn作为Observable中的成员变量onSubscribe。所以具体的实现逻辑就是在onSubscribe中了。
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
上面是OperatorSubscribeOn中的核心函数call()方法的源码,其中的逻辑还是挺简单的。
首先,通过scheduler创建出一个worker,这个scheduler就是我们在调用subscribOn的时候传入的,基本上我们用到的就是Schedulers.io(),Schedulers.newThread()和Schedulers.computation()三种,分别对应io型,新建型和计算密集型三种类型。
之后的逻辑都是在worker中执行的,相当就是在不同的线程环境中执行了。
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
executor = exec;
}
我们以NewThreadWorker为例,其中就是通过Executors创建出一个线程池的。
然后让我们来看一下call函数中最核心的一句代码:
source.unsafeSubscribe(s);
source表示的是在调用subscribeOn之前的Observable链,在unsafeSubscribe方法中:
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
其实就是去执行了Observable成员变量onSubscribe的call方法。
下面让我们以一个例子来辅助说明。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(createInt());
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
Log.d("TAG", "onNext: " + Thread.currentThread());
}
});
private int createInt() {
Log.d("TAG", "createInt: " + Thread.currentThread());
return 1;
}
需要注意的是,例子中为了方便,我直接使用了Observable.create方法。上之前的文章中我也说过,这是一个不好的习惯,请大家不要模仿,尽可能的不要自己使用Observable.create方法。
上面的代码,log输出的线程都是RxJava的io线程。
我们做一个总结,调用了subscribeOn之后会返回一个新的Observable,我们称之为newObservable,newObservable会有一个OperatorSubscribeOn的成员变量,我们称之为newSubscribeOn。例子中调用链的最后一个方法subscribe就是用的newObservable,从而会走到newSubscribeOn的call方法。而newSubscribeOn中通过source保存了subscribeOn之前的Observable链,我们称之为oldObservable,于是在newSubscribeOn中通过worker,将oldObservable放置到对应的线程中执行(调用unsafeSubscribe方法),从而调用oldObservable中的成员变量onSubscribe,我们称之为oldSubscribeOn的call方法。
也许文字说明太过繁琐,让我们用一张图来展示subscribeOn的原理:
observeOn
说完了subscribeOn,下面让我们来说一说observeOn。如果你认为两者的差距不大,仅仅是字面上的差异,那你就大错特错了,它们两个操作符有着本质上的区别。
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
上面是observeOn的源码,可以看到最终调用了lift。看到这里就知道为什么subscribeOn和observeOn有着本质上的差别了:subscribeOn是通过创建新的Observable来做到切换线程的,而observeOn则是通过operator,操作符的进行来实现的。
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
上面是OperatorObserveOn的核心函数call()方法。我们关注最后的else分支,和一般的operator一样,OperatorObserveOn通过创建新的subscriber来包裹旧的subscriber。
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
// only execute this from schedule()
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;
// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;
// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each bufferSize elements)
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(localOn.getValue(v));
currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}
上面是ObserveOnSubscriber的核心函数onNext()和call()方法。首先判断生命周期和背压,一切正常的情况下调用了schedule()方法,之后再通过call()方法调用被其包裹的子subscriber,并辅以scheduler的线程操作。
下面还是用图的方式来理解一下observeOn的过程:
对比
根据上面的源码加图的分析我们可以知道,subscribeOn是通过新建Observable的方式,使用OnSubscribe类的方式去做到线程切换的,相当于是层层包裹,层层回调,所以当你在一个调用链中调用多次subscribeOn是无效的,因为层层回调之后,只有最终的那一个(也就是第一个subscribeOn)才会生效:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(createInt());
}
})
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
Log.d("TAG", "onNext: " + Thread.currentThread());
}
});
上面的这个调用链,函数的执行都是在io线程中发生的,也就是只有第一个subscribeOn生效。
这里还要一点需要注意,那就是RxJava中的线程切换都是手动切换的的,不存在自动一说,所以当然调用了subscribeOn或者observeOn之后,它的影响是整条链路,如果你之后不再进行手动切换,线程环境是不会改变的。
下面我们再说observeOn。observeOn是通过operator操作符的形式去完成线程切换的,所以它的作用域和其他操作符一样,是调用observeOn之后的链路,并且由于是通过操作符的形式,所以observeOn是可以多次调用的:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(createInt());
}
})
.observeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
Log.d("TAG", "onNext: " + Thread.currentThread());
}
});
上面的代码,Action1中的call方法是在新创建出的线程中执行的,这证实了上面说的那一点:observeOn是可以多次调用的。但是createInt函数确实在主线程中执行的,那是因为observeOn本质上是一个operator,它没有能力去影响它上游的链路。
结合使用场景
最后,我们来看一下这两个操作符结合使用的情况:
private int createInt() {
Log.d("TAG", "createInt: " + Thread.currentThread());
return 1;
}
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(createInt());
}
})
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
Log.d("TAG", "map: " + Thread.currentThread());
return integer + 1;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
Log.d("TAG", "onNext: " + Thread.currentThread());
}
});
首先是最简单的一个例子,上面的代码中,createInt和map操作符的call是在io线程中执行的,而subscribe的Action1的call是在主线程中执行的。
结合我们上面一个单元所说的内容,subscribeOn影响的是全链路,所以createInt和map操作符的call被其影响,在io线程中执行,而执行完了map操作符之后,调用了observeOn,由于observeOn的本质是操作符,它会影响下游的链路,所以Action1的call会在主线程中执行。
上面的这个例子我们可以总结出以下几点:
1.subscribeOn影响是整条RxJava链路
2.observeOn只影响它下游的链路
3.连续调用subscribeOn是没有用的,只有第一次会生效
4.连续调用observeOn是有用的,会不断的切换线程。
5.在observeOn调用之后,subscribeOn的影响就会被observeOn抵消。换句话说,observeOn就是subscribeOn的"拦路虎"