RxJava2-从源码角度分析create

811 阅读6分钟

示例

//1、创建一个被观察着(公众号),定义要发送的事件
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
	emitter.onNext("文章1");
	emitter.onNext("文章2");
	emitter.onNext("文章3");
	emitter.onComplete();
    }
});
//2、创建一个观察着(用户),接收事件并作出响应操作
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
	Log.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String str) {
	Log.d(TAG, "onNext : "+str);
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() {
	Log.d(TAG, "onComplete");
    }
};
//建立用户和公众号的订阅关系
observable.subscribe(observer);

其实RxJava的核心思想就是观察者模式,只要理解这个,其实RxJava也不难。说白了就是要有观察者和被观察着,然后建立观察者和被观察者之间的关系。

  • 被观察者Observable(公众号)何时创建?
  • 观察者Observer(用户)何时创建?
  • 被观察者与观察者如何subscribe(用户关注了公众号)订阅?
1. 首先查看被观察者Observable类

被观察者Observable为抽象类 实现 ObservableSource接口

public abstract class Observable<T> implements ObservableSource<T> {
	...
	public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
	...
}

很简单,create方法里面就两行代码,先去判断source的是否为空,然后再去调用RxJavaPlugins.onAssembly(new ObservableCreate(source)),先 new 了 ObservableCreate 类,该类继承了 Observable,然后通过 RxJavaPlugins.onAssembly 方法返回 Observable 对象。

这样 Observable 就创建完成了,其实是创建了 Observable 的子类 ObservableCreate 对象,也就是真实的被观察着对象

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
	//这个方法很重要,Observable 的 subscribe 其实就执行的这个方法
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
}

2. 观察者 Observer
public interface Observer<T> {
	//订阅时回调
	void onSubscribe(@NonNull Disposable d);
	//成功回调
	void onNext(@NonNull T t);
	//错误回调
	void onError(@NonNull Throwable e);
	//完成时回调
	void onComplete();
}

这个类十分简单,是标准的函数式接口

3. 被观察者与观察者如何subscribe(用户关注了公众号)订阅

observable.subscribe(observer);

public final void subscribe(Observer<? super T> observer) {


    observer = RxJavaPlugins.onSubscribe(this, observer);

    subscribeActual(observer);

}

RxJavaPlugins.onSubscribe 此时直接返回Observer观察者对象,最后执行的是 subscribeActual() 方法,我们点进去看看

protected abstract void subscribeActual(Observer<? super T> observer);

是个抽象方法,我们来看看 Observable 子类 ObservableCreate 里面是不是有 subscribeActual(Observer<? super T> observer),又回到了 ObservableCreate 类。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

该方法的参数就是我们创建的观察者 Observer,这里创建了 CreateEmitter 对象,把我们的观察者 Observer 传到 CreateEmitter 的构造方法中。 CreateEmitter 类继承了 Disposable 接口:

 static final class CreateEmitter<T>  extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
		//最终的回调就是我们创建的观察者 Observer 的 onNext()
        @Override
        public void onNext(T t) {
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
        //最终的回调就是我们创建的观察者 Observer 的 onError()
        @Override
        public boolean tryOnError(Throwable t) {
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }
		//最终的回调就是我们创建的观察者 Observer 的 onComplete()
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
    	......
    }


是不是看到了我们在创建观察者 Observer 时的几个回调方法了,

接下来observer.onSubscribe(parent);这行代码,其实就是我们创建观察者 Observer 时的

public void onSubscribe(Disposable d) 回调

	Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

     ......

的参数 Disposable d 就是 CreateEmitter 对象

再看 source.subscribe(parent); 这行代码,这个source就是我们创建被观察者 Observable(其实是它的子类 ObservableCreate)时 new 的 ObservableOnSubscribe,它只有一个 subscribe 方法,执行完这行代码,被观察者与观察者就订阅关系。

那么当我们在执行

 emitter.onNext("文章1");
 emitter.onNext("文章2");
 emitter.onNext("文章3");
 emitter.onComplete();

这几行代码的时候,也就是被观察者 Observable 通过CreateEmitter发送事件时,观察者 Observer 就会走相应的回调方法, 当此执行完 onComplete() 观察者收到完成回调,整个订阅流程就完成了。

Observable.subscribe()除了接收Observer参数外,还可以接收Consumer参数

示例

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, " Consumer : onNext : " + s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, " Consumer : onError");
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, " Action : onComplete");
            }
        }, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.d(TAG, " Consumer : onSubscribe");
            }
        })

这里我们看到传入的是Consumer,我们来看看这个类:

public interface Consumer<T> {
    void accept(T t) throws Exception;
}

很简单,就是个普通的接口,里面只有一个accept方法

当使用Consumer作为subscribe()的参数时,最多可以接收4个回调参数,而且执行结果和subscribe(observer)的一样。 接下来我们看下Observable的subscribe()方法

public final Disposable subscribe() {
    return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

public final Disposable subscribe(Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
    return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                                  Action onComplete) {
    return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                                  Action onComplete, Consumer<? super Disposable> onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;
}

有五个重载方法,参数个数0到4个,参数名和接口Observer的方法名一样,从而我们可以猜测处理的事情应该和Observer每个方法的一样。我们看到 当参数少于4个时,就传入空的回调(不是null),那么我们直接看4个参数的subscribe()。

先分别判断参数是否为空,然后通过这个4个参数构造一个 LambdaObserver 对象,最后调用subscribe(ls),这个subscribe(ls)又是什么呢?我们点进去看看:

public final void subscribe(Observer<? super T> observer) {
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        subscribeActual(observer);
    } catch (NullPointerException e) {
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);

        npe.initCause(e);
        throw npe;
    }
}

嗯?似曾相识,对,就是我们认识的那个subscribe,参数就是上面创建的 LambdaObserver 对象,LambdaObserver继承了接口Observer,然后在传入 subscribeActual(observer) ,接下来的流程就和上面一样了。

我们在看 LambdaObserver 类

public final class LambdaObserver<T> extends AtomicReference<Disposable>
        implements Observer<T>, Disposable, LambdaConsumerIntrospection {

    private static final long serialVersionUID = -7251123623727029452L;
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Consumer<? super Disposable> onSubscribe;

    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete,
            Consumer<? super Disposable> onSubscribe) {
        super();
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onSubscribe = onSubscribe;
    }
    //调用观察者的 onSubscribe()
    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            try {
                onSubscribe.accept(this);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                d.dispose();
                onError(ex);
            }
        }
    }
    //调用观察者的 onNext()
    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            try {
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                get().dispose();
                onError(e);
            }
        }
    }
    //调用观察者的 onError()
    @Override
    public void onError(Throwable t) {
        if (!isDisposed()) {
            lazySet(DisposableHelper.DISPOSED);
            try {
                onError.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(new CompositeException(t, e));
            }
        } else {
            RxJavaPlugins.onError(t);
        }
    }

    //调用观察者的 onComplete()
    @Override
    public void onComplete() {
        if (!isDisposed()) {
            lazySet(DisposableHelper.DISPOSED);
            try {
                onComplete.run();
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(e);
            }
        }
    }

}

构造方法里面就是我们传进去的四个参数,而且有四个方法我们似乎也很熟悉,对,他就是接口 Observer 的实现方法。这四个方法中分别执行了四个回调

onSubscribe.accept(Disposable);

onNext.accept(T);

onError.accept(Throwable);

onComplete.run();

也就是subscribe时传入的回调。

RxJava2-从源码角度分析just

欢迎转载,但是请注明出处

示例源码
个人Github主页

如果对您有帮助,您可以 "Star" 支持一下哦, 谢谢! ^^