RxJava 2 版本的 Rxbus

5,238 阅读2分钟
原文链接: www.tuicool.com

欢迎转载,但请务必在明确位置注明文章出处! http://johnnyshieh.github.io/android/2017/03/10/rxbus-rxjava2/

基于 RxJava 的 RxBus 作为一种事件总线,相信许多人都了解一些,Square 的 Otto 也因此弃用,因为现在 RxJava 太火了,用它几行代码就可以写出事件总线。不过大家所熟悉的是基于 RxJava 1.x 版本的,2016 年十月底 RxJava 更新到 2.x 版本了,具体变化请看 What’s different in 2.0 ,下面总结下适合不同场景的 RxJava 2 版本的 RxBus 写法。

  • 没有背压处理(Backpressure)的 Rxbus

  • 有背压处理的 RxBus

  • 有异常处理的 Rxbus (订阅者处理事件出现异常也能继续收到事件)

1. 没有背压处理(Backpressure)的 Rxbus

在 RxJava 2.0 之后, io.reactivex.Observable 中没有进行背压处理了,如果有大量消息堆积在总线中来不及处理会产生 MissingBackpressureException 或者 OutOfMemoryError ,有新的类 io.reactivex.Flowable 专门针对背压问题。

import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;

public class RxBus {

    private final Subject<Object> mBus;

    private RxBus() {
        // toSerialized method made bus thread safe
        mBus = PublishSubject.create().toSerialized();
    }

    public static RxBus get() {
        return Holder.BUS;
    }

    public void post(Object obj) {
        mBus.onNext(obj);
    }

    public <T> Observable<T> toObservable(Class<T> tClass) {
        return mBus.ofType(tClass);
    }

    public Observable<Object> toObservable() {
        return mBus;
    }

    public boolean hasObservers() {
        return mBus.hasObservers();
    }

    private static class Holder {
        private static final RxBus BUS = new RxBus();
    }
}

2. 有背压处理的 RxBus

import io.reactivex.Flowable;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.PublishProcessor;

public class RxBus {

    private final FlowableProcessor<Object> mBus;

    private RxBus() {
        // toSerialized method made bus thread safe
        mBus = PublishProcessor.create().toSerialized();
    }

    public static RxBus get() {
        return Holder.BUS;
    }

    public void post(Object obj) {
        mBus.onNext(obj);
    }

    public <T> Flowable<T> toFlowable(Class<T> tClass) {
        return mBus.ofType(tClass);
    }

    public Flowable<Object> toFlowable() {
        return mBus;
    }

    public boolean hasSubscribers() {
        return mBus.hasSubscribers();
    }

    private static class Holder {
        private static final RxBus BUS = new RxBus();
    }
}

3. 有异常处理的 Rxbus

上面的两种 RxBus 在订阅者处理事件出现异常后,订阅者无法再收到事件,这是 RxJava 当初本身的设计原则,但是在事件总线中这反而是个问题,不过 JakeWharton 大神写了即使出现异常也不会终止订阅关系的 RxRelay ,所以基于 RxRelay 就能写出有异常处理能力的 Rxbus。

import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;
import io.reactivex.Observable;

public class RxBus {

    private final Relay<Object> mBus;

    private RxBus() {
        // toSerialized method made bus thread safe
        mBus = PublishRelay.create().toSerialized();
    }

    public static RxBus get() {
        return Holder.BUS;
    }

    public void post(Object obj) {
        mBus.onNext(obj);
    }

    public <T> Observable<T> toObservable(Class<T> tClass) {
        return mBus.ofType(tClass);
    }

    public Observable<Object> toObservable() {
        return mBus;
    }

    public boolean hasObservers() {
        return mBus.hasObservers();
    }

    private static class Holder {
        private static final RxBus BUS = new RxBus();
    }
}