RxJava 学习记 (三) — 1.x 线程调度器 Schedulers

2,007 阅读3分钟
原文链接: blog.csdn.net

简介


在没有给定调度器(Scheduler)的情况下,Subscription将默认(产生事件与订阅)运行于调用线程上。

线程调度器(Scheduler)是将RxJava从同步观察者模式转到异步观察者模式的一个重要工具。

RxJava提供了5种主要的调度器:

  • Scheduler Schedulers.io()
  • Scheduler Schedulers.computation()
  • Scheduler Schedulers.immediate()
  • Scheduler Schedulers.newThread()
  • Scheduler Schedulers.trampoline()

还有可用于测试的调度器Schedulers.test() 及 可自定义Scheduler—-Schedulers.form()
 

Schedulers.io()


内部创建一个rx.internal.schedulers.CachedThreadScheduler。底层实现是一个Java中的ScheduledThreadPoolExecutor (extends ThreadPoolExecutorimplements ScheduledExecutorService)

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
  super(corePoolSize, Integer.MAX_VALUE,
        DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
        new DelayedWorkQueue(), threadFactory);
}

corePoolSize=1, DEFAULT_KEEPALIVE_MILLIS=10L, DelayedWorkQueue是一个二叉树结构实现的BlockingQueue

整体还是一个无界(即容量特别大)的队列实现

例如,存储Bitmap到本地时,可以直接在Schedulers的io线程中执行任务:

public static void storeBitmap(Context context, Bitmap bitmap, String filename) {
    Schedulers.io().createWorker().schedule(() -> {
        blockingStoreBitmap(context, bitmap, filename);
    }); 
}

 

Schedulers.computation()


内部是由 rx.internal.schedulers.EventLoopsScheduler 实现的。

这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认
调度器: buffer()debounce()delay()interval()sample()skip()
 

Schedulers.immediate()


内部创建一个rx.internal.schedulers.ImmediateScheduler。 这个调度器允许你立即在当前线程执行指定的工作。

它是 timeout()timeInterval()timestamp() 方法默认的调度器
 

Schedulers.newThread()


内部创建一个rx.internal.schedulers.NewThreadScheduler。一底层跟Schedulers.io()一样是由java的ScheduledThreadPoolExecutor实现。

它为指定任务启动一个新的线程
 

Schedulers.trampoline()


内部创建一个rx.internal.schedulers.TrampolineScheduler。运行在当前线程。当有新任务时,并不会立即执行,而是将它加入队列PriorityBlockingQueue中,直到运行任务执行完成后,才从队列中按序取出一个继续执行。

它是repeat()retry()默认的调度器
 

用于测试的调度器Schedulers.test()


(some from blog.csdn.net/siguoyi/art…)

创建一个rx.schedulers.TestScheduler。这是一个公开的可访问的类。也可以直接使用无参构造方法,new出一个实例。

主要提供如下三个方法,来对调度器的时钟表现进行手动微调,这对依赖精确时间安排的任务的测试很有用处。

  • advanceTimeBy(time,unit) 将调度器时时钟,前进一个指定时间。这是相对操作
  • advanceTimeTo(time,unit) 将调度器时钟拨动到一个指定的时间。 这个是绝对操作
  • triggerActions( ) 开始执行任何计划中的但是未启动的任务,如果它们的计划时间等于或者早于调度器时钟的当前时间

假定当前时间为0, 先advanceTimeBy(2, TimeUnit.SECONDS)再advanceTimeTo(2, TimeUnit.SECONDS),那么现在时间还是2。若反过来调用,那么现在时间就是4b了
 

自定义Scheduler—-Schedulers.form()


使用Schedulers.form(java.util.concurrent.Executor executor) ,来自定义Scheduler
 

subscribeOn()和observeOn()


subscribeOn()observeOn() 是用来指定事件生产与订阅在哪个线程执行的。

  • 默认没有定义observeOn、subscribeOn,即运行于当前线程
  • subscribeOn 指定 订阅事件发生(OnSubscribe)的线程
  • observeOn 指定 在其之后的所有事件发生的线程,即使后面出现了subscribeOn
  • 如果subscribeOn出现前,没有observeOn,这时,subscribeOn 指定 OnSubscribe 及它之前的所有事件

示例:

Action1 action = (Action1<String>) s -> 
    System.out.println("test-Observer: " + Thread.currentThread().getName() + ", " + s);

Observable.Transformer<Integer, String> transformer = integerObservable ->
                integerObservable.map((Func1<Integer, String>) integer ->
                        "test-tran.call: " + Thread.currentThread().getName() + ", " + integer);

Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
    System.out.println( "test-OnSubscribe.call: " + Thread.currentThread().getName());
    subscriber.onNext(9);      }).subscribeOn(io()).observeOn(AndroidSchedulers.mainThread()).compose(transformer).subscribe(action);

可自由变换subscribeOn、observeOn出现的位置,观察影响哪部分运行的线程