阅读 2612

拥抱RxJava(番外篇):关于RxJava的Tips & Tricks

前言:

起初写 拥抱RxJava 系列文章。只是因为看到很多人在使用RxJava时候,并没有很正确的理解Reactive Programming。仅仅在项目中使用了Retrofit的Rx Adapter或者使用了一点点RxBus就写道自己的项目中用了RxJava,并以此传道。我觉得这样是不好的。所以写了这一系列更好的介绍RxJava。 但是可能个人的语言能力确实略差,每次都会让读者有些许误解。今天这篇我们就放松一下,分享一下我使用RxJava时的一些Tips & Tricks (本文在为指定背景的情况下,使用的是 RxJava 2.x 版本。 )

这篇文章很多借鉴了这个presentation中的一些技巧: Common RxJava Mistakes (视频需要科学上网)

0 RxJava 不是网络请求库,不是线程库,RxJava是响应式编程在JVM上的一种实现方式!

很多人使用RxJava仅仅是因为线程切换方便,或者是因为Retrofit提供了这样一个炫酷的返回方式,或者仅仅是因为RxJava这种链式调用很炫酷,又或是因为大家都用RxJava我不用怕跟不上节奏。
如果你使用RxJava仅仅是因为如上几个原因,我建议你放弃RxJava。因为RxJava给你带来的坑将远多于你从RxJava中获得的便利。曾经有一个老爷爷说过一句话:

With Great Power Comes Great Responsibility
力量越大,责任越大

RxJava能做的远不仅仅是切换线程或者简单的变换(map()filter()等等),与之相伴的就是数不尽的大坑。所以当然我这篇不可能覆盖所有的 RxJava 的坑。如果你想使用RxJava,回头问一下自己使用这个库的初衷,究竟是得到得多还是失去的多?

1 observeOn vs subscribeOn

这两个操作符可能对很多人是最常用的两个了。 然而这中间也有很多大坑在这里。

1.1 subscribeOn 控制上游,observeOn控制下游

很多人误以为 subscribeOn 控制的是Observable的生成的线程而observeOn控制的是 subscribe() 方法发生的线程。 当然,这点你们可以怪扔物线大神,毕竟他在他对RxJava最著名的帖子中写道:

  • subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。 ——扔物线

这个观点不能说是错的(这点我稍后再将),但是更多的是 subscribeOn控制整个上游,而observeOn控制整个下游。 举个例子:

Observable.just("1","2","3")
    .map(x -> x.length())
    .subscribeOn(Schedulers.io())
    .flatMap(x -> Observable.just(x,"pause"))
    .observeOn(Schedulers.computation())
    .map(x -> someHeavyCaculation(x))
    .subscribe(x -> Log.d(TAG, x));复制代码

这段代码中各个操作符是在哪些线程中进行的?
我们看下答案:

Observable.just("1","2","3") //IO 线程
    .map(x -> x.length()) //IO 线程
    .subscribeOn(Schedulers.io())
    .flatMap(x -> Observable.just(x,"pause")) //IO 线程
    .observeOn(Schedulers.computation())
    .map(x -> someHeavyCaculation(x)) //computation 线程
    .subscribe(x -> Log.d(TAG, x)); //computation 线程复制代码

所以我们看到了,observeOn 后面的所有操作都会在这个线程工作。subscribeOn 会从这个Observable生成一直到遇到其他 observeOn。所以 observeOnsubscribeOn 的位置非常关键。

当然,这点问题 扔物线大神在文章中也详细的讲到了:

因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。 ——扔物线

只不过很多人看帖子看一半,扔物线大神把这最重要的部分放到了lift()后面讲,很多人看完枯燥的lift()后,选择性忽视了最后关于 Scheduler非常关键的这部分。

1.2 subscribeOn只发生一次,observeOn可以使用多次

如果程序需要多次切换线程,使用多次observeOn是完全可以的。 而subscribeOn只有最上方的subscribeOn会起作用。这点扔物线大神的文章也补充过,大家可以回头再重温一下。

1.3 不是所有操作符都会在默认线程执行

很多操作符的默认执行线程并不是当前线程,这类操作符有一个特征就是会提供带有 Scheduler 参数的重载方法,比如 intervalinterval 会默认在computation线程执行,如果你在后面加上subscribeOn。 他还是会在computation线程执行,你只有在重载方法里加入其他 Scheduler,他才会在其他线程执行。如果你仔细看过 RxJava 的 JavaDoc。 他都会明确写出这个操作符的默认工作线程。

2 如果可能,避免使用Subject 当然包括RxBus!!

subject 作为Observable的结合体,在使用时非常方便。但是在使用时,很多时候并不尽人意。

2.1 Subject 的行为是不可预期的

Subject 由于暴露 onNext 方法。非常难控制。任何有这个subject引用的对象都可以使用这个方法传输数据,任何订阅了subject的人都可以接收到这个数据。
这导致你订阅Subject后几乎不清楚数据来源到底是谁。甚至也不知道你收到的到底是什么数据。这也是为什么我强烈抵制 RxBus 的一大原因。
Subject 由于自己是Observable, 他遵循Observable Contract。 如果其中某个事件出现异常,onError触发,那么这个Subject将再也不能使用。当然这点可以使用 Jake Wharton的 RxRelay来解决。 RxRelay就是一个没有onComplete和onError的Subject。
所以如果你的程序中必须使用Subject, 推荐将其设为 private field并且对外只暴露他的Observable形式。

2.2 Subject 默认是Hot Observable

关于 hot/cold Observable我在这篇文章中详细的解释过: 拥抱RxJava(三):关于Observable的冷热,常见的封装方式以及误区

Subject默认是热的,也就是说你发送的信息接收者是否接受的到是不一定的。是需要根据情况分析的。具体可以看我关于hot/cold Observable 的文章。

2.3 Again。 不要在继续使用RxBus了

RxBus 几乎都是基于Subject的再次封装。使得他不仅拥有了Subject是所有缺点还加入了很多缺点,比如他不是类型安全的。 我见过太多的RxBus封装都只是一个接受Object类型的Subject。这个问题当然也有很多RxBus通过 键值对或ofType()等等操作解决。再比如RxBus更容易造成内存泄漏(因为需要将所有事件和订阅者存储在Subject中,)。更多欢迎再次看一下我的第一篇关于RxJava的文章: 放弃RxBus,拥抱RxJava(一)

前几天我在Reddit上看到一个人的回复:

I think EventBus on android is popular because people don't know how to share a java object reference between android components like a Fragment and an Activity, or 2 Activities and so on. So basically I think people don't know how 2 Activites can observe the same object for data changes which I think comes from the fact that we still don't know how to architect our apps properly.

我认为 EventBus在Android上火爆的原因是人们不知道怎么去在Android组件,例如Activity/Fragment之间共享一个Java对象的引用。

这个回复可以说应该是触到了很多人的痛点。很多情况我们用EventBus仅仅是不知道如何在多个Fragment/Activity之间共享一个对象。EventBus的做法是在Bus里登记所有的接受者。这点在RxJava里类似,Subject/ConnectableObservable 都有类似的功能。但问题是EventBus作为一个全局Bus,各种不同类型的事件管理会很麻烦(虽然EventBus把这些事给你做好了,RxBus要自己弄)。我们有了RxJava完全可以避免不同事件的管理。相同事件封装成对应Observable,根据需求选择订阅。这样保持了类型安全,提高了性能,逻辑更清晰。

想一想,自己使用EventBus是不是也是这个原因呢?

3 如果你还在使用RxJava 1.x 建议尽快升级2.x版本

RxJava 2.x 更新了很多新内容。比如将Backpressure机制分离出来做成Flowable等等。 而且RxJava 1.x马上要寿终正寝,进入不再更新的模式(2017年6月)。所以还在使用RxJava 1.X 的同学们尽快更新吧。
如果你仍然处于某种原因,必须使用RxJava 1.x, 那么也千万不要使用Observable.create(Observable.OnSubscribe<T> f) 操作符(现已经被Deprecated)创建Observable。使用其他工厂方法或者直接升级为RxJava 2.x (现已经更新到2.1.0)才是正确的选择。

4 关于操作符

4.1 尽量避免过多的使用操作符,能合并的操作符尽量合并。

这里的合并不是指使用ObservableTransformer合并。而是指在逻辑上合并,比如:

    Observable.just("Hello","World","RxJava")
              .map(x -> x.length())
              .map(x -> x + 2)
              .subscribe(/**********/)复制代码

这里的两个map明显可以写成一个。 我们知道,每个操作符都会根据操作符的特性生成新的Observable,订阅他的上游然后给下游发送数据,避免使用过多的操作符可以降低内存抖动。
所以我不是很推荐使用ObservableTransformer来合并出来一个

ObservableTransformer applyThread = upstream ->
    upstream.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());复制代码

这样虽然在写法上更简单了。但是损失了observeOn的灵活性还额外增加OverHead。得不偿失。当然,如果我们使用Transformer来进行模块解耦,这当然是非常值得的。详细可以参考我的上一篇文章:
动手做一个Full Rx App

4.2 flatMap并不保证发射顺序。

flatMap是将他每个得到的item转换成一个Observable,然后通过merge融合这些Observable。但是每个对应的Observable发射出去的一个或多个项目并不是完全有序的。如果想要保证发射顺序,使用concatMap。同理,merge操作符也不保证顺序,如果需要有序,使用concat

4.3 如果不是必要,不要在flatMap中使用过多的操作符。

我们刚才说了,每个item都会生成一个新的Observable,每个操作符也会。所以如果你的flatMap中有其他操作符,比如下面的代码:

Observable.fromIterable(list)
    .flatMap(x -> Observable.just("x",x,"y")
        .map(item -> "item" + item))
    .subscribe();复制代码

如果你的list中有上万个item。 那么你将会调用这个map上万次,多生成上万个ObservableMap来进行这个操作。 我们可以简单的消除这个OverHead。 将map拿出来,如下:

Observable.fromIterable(list)
    .flatMap(x -> Observable.just("x",x,"y"))
    .map(item -> "item" + item)
    .subscribe();复制代码

这样flatMap后的Observable会统一进行管理。省去了那上万个ObservableMap。

4.4 如果不是必要,不要自己写 Observable 的操作符。

Observable的每个操作符都有着很复杂的逻辑,就连很多RxJava的专家都会出错。如果你真的想写自己的操作符,我建议你首先阅读这个文章:
Writing operators for 2.0
详细的介绍了如何写操作符,要遵顼哪些规则。 顺便一提,这个文章有将近1700行,还有三个主要模块处于TBD阶段,并没有完全补充。写操作符的难度可想而知。

4.5 使用ignoreElements().andThen()来进行多步操作。

很多时候我们的一套流程不仅仅包含一个操作。比如我们想通过一个List<page>来加载几个页面。加载后我们想进行其他操作,但又不想破坏整个链条结构。我们就可以通过ignoreElements().andThen()的结合:

        Observable.fromIterable(list)
                //在这里进行载入页面
                .doOnNext(item -> loadPage(item))
                //ignoreElements会提供给你一个Completable
                .ignoreElements()
                //andThen触发证明上游的Completable已经结束。onComplete触发,这是转而进行andThen里的操作
                .andThen(Observable.just("Complete"))
                //进行其他操作
                .subscribe(x -> System.out.println(x + "已经结束载入所有页面"));复制代码

小彩蛋:RxJava和Kotlin我最近碰到的一个坑

最近使用RxJava和Kotlin。 我将

startWith(idleState());复制代码

写成了

startWith{idleState()};复制代码

在我的IDE几乎肉眼难以分辨的区别。由于kotlin的lambda规则,{} 把我需要的变量解析成了lambda表达式,又由于正好是单参数的lambda,可以省略参数转换为 it 的写法。导致我这句仍然可以编译,但重载了错误的操作符。导致整个链条崩溃。那些年和我说kotlin语法更为安全的人你过来我给你加个BUFF!