该篇对rxjava3.0操作符源码进行分析, 这里我以简单的onFilter()
操作符进行讲解。以妈妈叫孩子吃饭为例子, 代码入下:
private void callEatSimpleOperator() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
if(mTag==1) {
emitter.onNext("baby ,it's time for lunch");
mTag = 0;
}
else {
emitter.onNext("hello world");
mTag = 1;
}
}
}).filter(new Predicate<String>(){
@Override
public boolean test(String str) {
if(“baby ,it's time for lunch”.equals(str))
return true;
return false;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
});
}
当妈妈叫孩子吃饭的时候,会走到filter的回调方法中, 判断是否为“baby ,it's time for lunch”字符串, 如果是的话返回true, 会继续把事件进行传递,否则返回false,将终端事件的传递。
源码分析
下面来看filter源码
- 创建ObservableFilter对象
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
它会返回ObservableFilter对象,来看它的类声明及构造方法。
public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
final Predicate<? super T> predicate;
public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = predicate;
}
...
可以看到ObservableFilter类继承自AbstractObservableWithUpstream抽象类,看源码
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
可以看到AbstractObservableWithUpstream是继承自Observable的。而传入进来的source对象正是上游的Observable对象,Observable类也是实现了ObservableSource
接口。
- 订阅
当订阅发生后, 前文分析到它会执行到Observable的
subscribeActual
方法。 来看ObservableFilter.subscribeActual
方法。
@Override
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate));
}
可以看到它生成并订阅了一个FilterObserver(Observer<? super T> actual, Predicate<? super T> filter)
对象,并传入了下游的Observer对象及filter的回调接口方法。
static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
final Predicate<? super T> filter;
FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
super(actual);
this.filter = filter;
}
@Override
public void onNext(T t) {
if (sourceMode == NONE) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
downstream.onNext(t);
}
} else {
downstream.onNext(null);
}
}
首先看FilterObserver构造方法中会执行super(actual)
它执行了this.downstream = downstream
也就是将上面传下来的Observer对象赋值给下游Observer downstream对象。
这时候订阅发生后会执行到FilterObserver.onNext
可以看到会执行到b = filter.test(t)
当返回true 会执行downstream.onNext(t)
下游的onNext方法,否则不进行处理,事件中断传输,到此对filter
操作符就分析完了。下面是增加filter操作符的时序图的整体订阅过程。
时序图
下面是rxjava 事件订阅及操作符的一个结构图
结构图
事件流转:
ObservableCreate.emitter
发射事件Operator01Observable.Operator01Observer
收到事件Operator01Observable
将事件发出去Operator02Observable.Operator02Observer
收到事件Operator02Observable
将事件发出去- 最终
Observer
收到事件
总结
每一个操作符都继承于Observable
, 内部都是有一个Observer
内部类,它对上游Observer
进行订阅,当Observer收到上游事件后,接下来会由自身发射事件给下游,最后由最下游的Observer
收到。