欢迎转载,但请务必在明确位置注明文章出处! http://johnnyshieh.github.io/android/2017/03/10/rxbus-rxjava2/
基于 RxJava 的 RxBus 作为一种事件总线,相信许多人都了解一些,Square 的 Otto 也因此弃用,因为现在 RxJava 太火了,用它几行代码就可以写出事件总线。不过大家所熟悉的是基于 RxJava 1.x 版本的,2016 年十月底 RxJava 更新到 2.x 版本了,具体变化请看 What’s different in 2.0 ,下面总结下适合不同场景的 RxJava 2 版本的 RxBus 写法。
-
没有背压处理(Backpressure)的 Rxbus
-
有背压处理的 RxBus
-
有异常处理的 Rxbus (订阅者处理事件出现异常也能继续收到事件)
1. 没有背压处理(Backpressure)的 Rxbus
在 RxJava 2.0 之后, io.reactivex.Observable
中没有进行背压处理了,如果有大量消息堆积在总线中来不及处理会产生 MissingBackpressureException
或者 OutOfMemoryError
,有新的类 io.reactivex.Flowable
专门针对背压问题。
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
public class RxBus {
private final Subject<Object> mBus;
private RxBus() {
// toSerialized method made bus thread safe
mBus = PublishSubject.create().toSerialized();
}
public static RxBus get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.onNext(obj);
}
public <T> Observable<T> toObservable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Observable<Object> toObservable() {
return mBus;
}
public boolean hasObservers() {
return mBus.hasObservers();
}
private static class Holder {
private static final RxBus BUS = new RxBus();
}
}
2. 有背压处理的 RxBus
import io.reactivex.Flowable;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.PublishProcessor;
public class RxBus {
private final FlowableProcessor<Object> mBus;
private RxBus() {
// toSerialized method made bus thread safe
mBus = PublishProcessor.create().toSerialized();
}
public static RxBus get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.onNext(obj);
}
public <T> Flowable<T> toFlowable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Flowable<Object> toFlowable() {
return mBus;
}
public boolean hasSubscribers() {
return mBus.hasSubscribers();
}
private static class Holder {
private static final RxBus BUS = new RxBus();
}
}
3. 有异常处理的 Rxbus
上面的两种 RxBus 在订阅者处理事件出现异常后,订阅者无法再收到事件,这是 RxJava 当初本身的设计原则,但是在事件总线中这反而是个问题,不过 JakeWharton 大神写了即使出现异常也不会终止订阅关系的 RxRelay ,所以基于 RxRelay 就能写出有异常处理能力的 Rxbus。
import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;
import io.reactivex.Observable;
public class RxBus {
private final Relay<Object> mBus;
private RxBus() {
// toSerialized method made bus thread safe
mBus = PublishRelay.create().toSerialized();
}
public static RxBus get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.onNext(obj);
}
public <T> Observable<T> toObservable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Observable<Object> toObservable() {
return mBus;
}
public boolean hasObservers() {
return mBus.hasObservers();
}
private static class Holder {
private static final RxBus BUS = new RxBus();
}
}