RxJava源码解析(二)—线程调度器Scheduler

724 阅读8分钟

在RxJava中,有个很重要的概念叫做"线程调度器"—Scheduler。它用一种隐式的方法屏蔽掉了我们之前通过回调方式的线程调用。我们先看个例子:

Observable<String> ob = Observable.just("str1","str2");
ob.map(new Func1<String, String>() {
@Override
public String call(String t) {
System.out.println("function call " + Thread.currentThread());
return "[" + t + "]";
}})
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String t) {
System.out.println("onNext call " + Thread.currentThread());
System.out.println("onNext "+t);
}
});

代码中,我们通过一个字符串生成了一个Observable对象,而这个对象我们又通过一个map映射映射成为一个新的Observable对象(这部分的知识请参照第一章RxJava源码解析(一)从一个例子开始)。在这之后,我们有通过调用observeOn方法设置了一个叫做Schedulers.newThread()的调度器。这个函数的目的是为了告诉你的被观察者,当你的数据返回的时候需要往哪个线程上post你的数据消息,换句话说,也就是你所定义的Subscriber对象的onCompleted/onError/onNext的执行线程。这段代码最后输出:

//output:
function call Thread[main,5,main]//map映射发生在默认线程也就是虚拟机主线程中
function call Thread[main,5,main]//map映射发生在默认线程也就是虚拟机主线程中
onNext call Thread[RxNewThreadScheduler-1,5,main] // 消息回调函数处理在一个新的线程中
onNext [str1]
onNext call Thread[RxNewThreadScheduler-1,5,main] // 消息回调函数处理在一个新的线程中
onNext [str2]

本章,我们将重点关注这个调度器,那么我们首先要思考的问题是,这个调度器将会提供什么功能呢?这就要回头看下我们能用这个调度器干什么了?

首先,我们需要调度器去帮助我们生成一个线程
其次,当我们以后得到了结果,我们还要需要调度器往调度器线程中发送一个消息,以便可以执行订阅者的回调函数

好的,基于我们上面的需求,我们将看下,在RxJava的调度器实现中,是如何实现我们所需要的功能的。
我们先来看下Observable对象所提供的observeOn函数,这个函数有多个函数重载,最终都会调用到三个参数的observeOn方法:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }

这里调用到了RxJava中一个很重要的操作符号liftlift函数引入了一个叫做Operator的新类型,在上述的例子中这个类型的实现类是一个叫做OperatorObserveOn的策略类型。我们看下这个lift函数定义:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

我们所传入的Operator对象最终被包装成为一个OnSubscribeLift对象,OnSubscribeLift对象是我们非常熟悉的OnSubscribe类型的子类。第一章我们说到OnSubscribe提供一种处理订阅者注册订阅后的策略。按照我们上面的例子,我们调用过map函数后调用observeOn函数,此时传入的onSubscribe对应的就是map产生的OnSubscribeMap对象。而参数operator对应observeOn函数中的OperatorObserveOn对象。我们先来看下Operator类的定义:

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }

Operator也是一种映射关系函数,转换类型是通过Subscriber<T>->Subscriber<R>。也就是说,Operator是一个直接转化新的Subscriber的映射函数。这样就可以在订阅前拦截订阅操作。比如:

Observable<String> ob = Observable.just("str1","str2");
        ob.map(new Func1<String, String>() {
            @Override
            public String call(String t) {
                System.out.println("function call " + Thread.currentThread());
                return "[" + t + "]";
            }})
        .lift(new Operator<String,String>(){

            @Override
            public Subscriber<String> call(Subscriber<? super String> st) {
                return new Subscriber<String>() {
                    @Override
                    public void onNext(String t) {
                        long startTime = System.currentTimeMillis();
                        System.out.println("onNext begin");
                        st.onNext(t);//用于监控订阅者的执行时间
                        System.out.println("onNext execute on next time = "+(System.currentTimeMillis() - startTime)+"ms");
                    }
                };
        }})
        .subscribe(new Subscriber<String>() {
            @Override
            public void onNext(String t) {
                IO.waitTime(5000);
                System.out.println("call onNext "+t);
            }
        });

比如,我们为了监控订阅者订阅的时候有多少的时间消耗,我们通过lift函数在我们的订阅者外包装了一层Subscriber,这样我们就可以依赖于包装的Subscriber对象进行函数监控:

//output:
function call Thread[main,5,main]
onNext begin//开启监控
call onNext [str1]
onNext execute on next time = 5005ms//监控结束

也就是说,上述的例子中我们的流程图应该是:

lift后流程图
lift后流程图

好的,有了上面的概念,我们可以来看下OperatorObserveOn的代码,我们看下它给我们生成了一个什么样的订阅者:

@Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
     ....
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
  .....
    }

lift函数流程图
lift函数流程图

Lift函数执行完后,会将我们所注册的Subscriber装饰成为一个ObserveOnSubscriber对象。"lift后流程图"的红色框框部分以后注明了这个对象的功能。我们先来看下ObserveOnSubscriber对象的onCompleted/onError三个方法:

 @Override
        public void onCompleted() {
            if (isUnsubscribed() || finished) {
                return;
            }
            finished = true;
            schedule();
        }

        @Override
        public void onError(final Throwable e) {
            if (isUnsubscribed() || finished) {
                RxJavaHooks.onError(e);
                return;
            }
            error = e;
            finished = true;
            schedule();
        }

由于onCompletedonError是互斥的,且只会被调用一次,因此会用一个finishedboolean变量来进行拦截,然后调用schedule()函数来处理剩下逻辑:

final AtomicLong counter = new AtomicLong();
 protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
 }

由于counter在调用getAndIncrement()后就大于0,因此recursiveScheduler.schedule(this)只会被调用一次,recursiveScheduler的定义在ObserveOnSubscriber的构造器中:

 public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            ...
        }

scheduler就是我们传入的Schedulers.newThread()对象,实际上是一个NewThreadScheduler对象:

@Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);//recursiveScheduler的类型是一个NewThreadWorker
    }

可以看出,recursiveScheduler最终会被置为NewThreadWorker类型

public NewThreadWorker(ThreadFactory threadFactory) {
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
     ....
        executor = exec;
    }

NewThreadWorker构造器中,定义了一个核心线程为1ScheduledThreadPool线程池。(ScheduledThreadPool是一个很特殊的线程池,这个线程池的主要是为了支持延迟任务,或者定时任务。)recursiveScheduler.schedule(this)实际上就是调用NewThreadWorkerschedule(Action0)方法。

 @Override
    public Subscription schedule(final Action0 action) {
        return schedule(action, 0, null);
    }

    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        if (isUnsubscribed) {
            return Subscriptions.unsubscribed();
        }
        return scheduleActual(action, delayTime, unit);
    }

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
       ....
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
       ...
    }

schedule方法最终会调用到scheduleActual方法,action对象会被包装成为一个ScheduledActionRunable对象提交给线程池executor。而线程池会调用ScheduledActionrun()方法,在run()方法中,又会调用Action0call()方法:

@Override
    public void run() {
        try {
            .....
            action.call();
        } catch (OnErrorNotImplementedException e) {
              ....
        }
    }

如果刚才的代码已经把你给绕懵了,不要紧,我们再来回顾一下流程:

1. 我们通过lift函数注册了一个叫做OperatorObserveOnOperator对象

2. lift函数会构造一个叫做OnSubscribeLift的对象用于构造一个Observable对象

3. 当订阅者Subscriber对象订阅Observable的时候,根据调用链,会优先使用OnSubscribeLift对象作为优先处理对象。

4 OnSubscribeLift调用call(Subscriber)方法,在该类的call方法中,会通过内部的Operator对象(也就是OperatorObserveOn对象)的Subscriber call(Subscriber)方法,生成一个新的订阅者ObserveOnSubscriber

5. 新的订阅者对象ObserveOnSubscriber被OnSubscribeLift对象传递给上层的OnSubscribe对象处理,也就是走如RxJava源码解析(一)从一个例子开始)中的流程,最后会走到OnSubscribeFromArray对象中,然后遍历里面的数组生产者

6. OnSubscribeFromArray遍历数组中的成员,然后调用订阅者的onNextonCompleted。而最终要调用到的订阅者就是ObserveOnSubscriber对象。

7. ObserveOnSubscriber对象的onNext()onCompleted()方法会触发执行schedule()方法,schedule()方法会调用Scheduler.Worker.schedule(Action0)方法,而这个Action0对象就是ObserveOnSubscriber类型

8. 当我们选择 Schedulers.newThread()调度器的时候,Scheduler.Worker对象实际类型为NewThreadWorker对象,而NewThreadWorker.schedule(Action0)中会将Action0对象包装成为ScheduledAction对象,ScheduledAction本质是一个Runnable类型,因此它可以被提交到线程池中,调用ScheduledAction.run()方法,而ScheduledAction.run()方法中,又会调用Action0.call()

9. 步骤8中Action0实现的类型为ObserveOnSubscriber类型,此时调用ObserveOnSubscriber.call()方法会从queue队列中读取onNext参数值并检测是否已经结束,注意,由于当前函数是由我们调度器生成的Worker对象中的线程池调用的,因此当前的全部回调操作都发生在Worker所构建的线程中。

#####总结
实际上,我们从上面可以看出,我们通过lift函数所构造出来的ObserveOnSubscriber对象,实际上是生成了一个OnSubscriber的装饰对象。而这个对象的具体操作,都被封装到了call()方法中去,换句话说,我们的调度器实际上就是提供一个容器,给我们的call()方法提供上下文。基于我们上述的结论,我们实际上就可以写出我们自己的调度器:

private static class SchedulerImpl extends Scheduler {
        @Override
        public Worker createWorker() {
            // TODO Auto-generated method stub
            return new WorkerImpl();
        }
    }

private static class WorkerImpl extends Scheduler.Worker {
        @Override
        public void unsubscribe() {}
        @Override
        public boolean isUnsubscribed() {
                        return false;
        }

        @Override
        public Subscription schedule(Action0 action) {
            return schedule(action,0,null);
        }
        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            Thread thread = new Thread() {
                public void run() {
                    action.call();
                };
            };
            thread.setName("test");
            thread.start();
            return null;
        }
    }

这个调度器的写法非常的简单:
1.我们先构建一个Scheduler用于管理我们的Worker
2.observeOn会给我们提供一个ObserveOnSubscriber类型的Action0对象,作为参数调用Worker.schedule(Action0 action)方法
3.我们生成了一个独立的线程"test",并在线程中调用Action0.call ()方法,这样就可以将事件发送到我们所订阅的真正的Subscriber上了

最后输出日志:

output:
call onNext [str1]
call onNext [str2]
call onCompleted Thread[test,5,main]