RxJava2源码分析——订阅

571 阅读5分钟

本文章主要是对RxJava2订阅流程进行源码分析,先说下我用的RxJavaRxAndroid版本,版本如下:

implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

我们先写段示例代码,为了方便理解,我就不用上Lambda链式调用了,代码如下:

// 创建被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) {
        Log.i("TanJiaJun", "subscribe");

        emitter.onNext("Tan");
        emitter.onNext("Jia");
        emitter.onNext("Jun");
        emitter.onComplete();
    }
});

// 创建观察者
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i("TanJiaJun", "onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.i("TanJiaJun", "onNext:" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i("TanJiaJun", "onError");
    }

    @Override
    public void onComplete() {
        Log.i("TanJiaJun", "onComplete");
    }
};

// 订阅
observable.subscribe(observer);

分成三步:

  1. 创建被观察者(Observable)。
  2. 创建观察者(Observer)。
  3. 调用被观察者的subscribe方法,传入观察者,将两者进行关联并且订阅。

源码分析

我们先从subscribe方法入手,代码如下:

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    // 判断observer是不是空
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        // 调用子类的subscribeActual方法
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

看下RxJavaPlugins.onSubscribe方法,代码如下:

@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
    BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
    if (f != null) {
        return apply(f, source, observer);
    }
    return observer;
}

注释说这个方法会调用关联的钩子函数(hook function),我们看到它会判断一下onObservableSubscribe是不是空,这个变量是通过setOnObservableSubscribe方法赋值的,代码如下:

@SuppressWarnings("rawtypes")
public static void setOnObservableSubscribe(
        @Nullable BiFunction<? super Observable, ? super Observer, ? extends Observer> onObservableSubscribe) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    }
    RxJavaPlugins.onObservableSubscribe = onObservableSubscribe;
}

然而我们没有调用这个方法,所以这里空的,直接返回observer

我们接着往下看,subscribeActual是个很重要的方法,它是个接口来的,Observable的子类都要去实现这个方法,接下来在讲创建被观察者的时候就会遇到。

我们调用Observable.create方法,代码如下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

RxJavaPlugins.onAssembly方法也是一个钩子函数,代码如下:

@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

它会判断onObservableAssembly变量是不是空,这个变量是通过setOnObservableAssembly方法赋值的,代码如下:

@SuppressWarnings("rawtypes")
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    }
    RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}

然而我们没有调用这个方法,所以我们直接看创建的ObservableCreate对象,要注意的点我都写上注释了,代码如下:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    // source在我们的示例代码里是上游Observable对象(被观察者)
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 创建CreateEmitter对象,传入下游Observer对象(观察者)
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 调用下游Observer对象的onSubscribe方法,并且传入CreateEmitter对象
        observer.onSubscribe(parent);

        try {
            // 调用上游Observable对象的subscribe方法,并且传入CreateEmitter对象
            source.subscribe(parent);
            // 这里可以得出结论,先执行下游Observer的onSubscribe方法,然后执行上游Observable的subscribe方法
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            // 发射一个错误的事件
            parent.onError(ex);
        }
    }

    // 该类继承了AtomicReference,可以实现原子操作
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        // 传入下游Observer对象
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            // 在RxJava2.x版本中,onNext方法不能传null,否则抛出空指针异常
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            // 当isDisposed方法为false时,调用下游observe的onNext方法,并且传入对应的对象
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            // 在RxJava2.x版本中,onError方法不能串null,否则抛出空指针异常
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            // 当isDisposed方法返回false时,调用下游observer的onError方法,并且传入Throwable对象,然后调用dispose方法
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            // 当isDispoesed方法返回false时,调用下游observer的onComplete方法,然后调用dispose方法
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public String toString() {
            return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
        }
    }

    // 省略部分代码
}

传入的是ObservableOnSubscribe接口,里面有个带ObservableEmitter参数的subscribe方法,代码如下:

public interface ObservableOnSubscribe<T> {

    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;

}

我们示例代码实现了这个方法,代码如下:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) {
        Log.i("TanJiaJun","subscribe");

        emitter.onNext("Tan");
        emitter.onNext("Jia");
        emitter.onNext("Jun");
        emitter.onComplete();
    }
});

依次调用了ObservableEmitteronNext方法和onComplete方法,这里的ObservableEmitter实现类是CreateEmitter,代码如下:

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        // parent是CreateEmitter
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

调用onNext方法和onComplete方法,实际上是调用了下游ObserveronNext方法和onComplete方法,代码如下:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        // observer是下游观察者
        observer.onNext(t);
    }
}
@Override
public void onComplete() {
    if (!isDisposed()) {
        try {
            // observer是下游观察者
            observer.onComplete();
        } finally {
            dispose();
        }
    }
}

也就是调用了我们示例代码中这些方法,代码如下:

@Override
public void onNext(String s) {
    Log.i("TanJiaJun",s);
}
@Override
public void onComplete() {
    Log.i("TanJiaJun","onComplete");
}

总结一下,整个流程如下:

  1. 调用上游Observablesubscribe方法,并且传入下游Observer
  2. subscribe方法里面执行了Observable的子类ObservableCreatesubscribeActual方法,并且传入下游Observer
  3. subscribeActual方法里面会依次执行下游ObserveronSubscribe方法和ObservableOnSubscribesubscribe方法,从而完成整个订阅流程。
  4. 如果我们去发射事件,例如示例代码中调用ObservableEmitteronNext方法和onComplete方法,那么下游ObserveronNext方法和onComplete方法就会执行。

我的GitHub:TanJiaJunBeyond

Android通用框架:Android通用框架

我的掘金:谭嘉俊

我的简书:谭嘉俊

我的CSDN:谭嘉俊