rxjava3.0入门到精通系列(三) 操作符源码分析

494 阅读3分钟

该篇对rxjava3.0操作符源码进行分析, 这里我以简单的onFilter()操作符进行讲解。以妈妈叫孩子吃饭为例子, 代码入下:

private void callEatSimpleOperator() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                if(mTag==1) {
                    emitter.onNext("baby ,it's time for lunch");
                    mTag = 0;
                }
                else {
                    emitter.onNext("hello world");
                    mTag = 1;
                }
            }
        }).filter(new Predicate<String>(){

            @Override
            public boolean test(String str) {
                if(“baby ,it's time for lunch”.equals(str))
                    return true;
                return false;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) {
                Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
            }
        });
    }

当妈妈叫孩子吃饭的时候,会走到filter的回调方法中, 判断是否为“baby ,it's time for lunch”字符串, 如果是的话返回true, 会继续把事件进行传递,否则返回false,将终端事件的传递。

源码分析

下面来看filter源码

  1. 创建ObservableFilter对象
    public final Observable<T> filter(Predicate<? super T> predicate) {
        ObjectHelper.requireNonNull(predicate, "predicate is null");
        return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
    }

它会返回ObservableFilter对象,来看它的类声明及构造方法。

  public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
    final Predicate<? super T> predicate;
    public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
        super(source);
        this.predicate = predicate;
    }
    ...

可以看到ObservableFilter类继承自AbstractObservableWithUpstream抽象类,看源码

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

可以看到AbstractObservableWithUpstream是继承自Observable的。而传入进来的source对象正是上游的Observable对象,Observable类也是实现了ObservableSource接口。

  1. 订阅 当订阅发生后, 前文分析到它会执行到Observable的subscribeActual方法。 来看ObservableFilter.subscribeActual方法。
    @Override
    public void subscribeActual(Observer<? super T> observer) {
        source.subscribe(new FilterObserver<T>(observer, predicate));
    }

可以看到它生成并订阅了一个FilterObserver(Observer<? super T> actual, Predicate<? super T> filter)对象,并传入了下游的Observer对象及filter的回调接口方法。

static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
        final Predicate<? super T> filter;

        FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
            super(actual);
            this.filter = filter;
        }

        @Override
        public void onNext(T t) {
            if (sourceMode == NONE) {
                boolean b;
                try {
                    b = filter.test(t);
                } catch (Throwable e) {
                    fail(e);
                    return;
                }
                if (b) {
                    downstream.onNext(t);
                }
            } else {
                downstream.onNext(null);
            }
        }

首先看FilterObserver构造方法中会执行super(actual) 它执行了this.downstream = downstream 也就是将上面传下来的Observer对象赋值给下游Observer downstream对象。 这时候订阅发生后会执行到FilterObserver.onNext可以看到会执行到b = filter.test(t) 当返回true 会执行downstream.onNext(t) 下游的onNext方法,否则不进行处理,事件中断传输,到此对filter操作符就分析完了。下面是增加filter操作符的时序图的整体订阅过程。

时序图

下面是rxjava 事件订阅及操作符的一个结构图

结构图

事件流转:

  1. ObservableCreate.emitter发射事件
  2. Operator01Observable.Operator01Observer收到事件
  3. Operator01Observable将事件发出去
  4. Operator02Observable.Operator02Observer收到事件
  5. Operator02Observable将事件发出去
  6. 最终Observer收到事件

总结

每一个操作符都继承于Observable, 内部都是有一个Observer内部类,它对上游Observer进行订阅,当Observer收到上游事件后,接下来会由自身发射事件给下游,最后由最下游的Observer收到。