just()示例
Observable.just("文章1", "文章2")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(String value) {
Log.d(TAG, " onNext : " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, " onComplete");
}
})
Observable 的just()有10个重载方法,参数1~10个
public static <T> Observable<T> just(T item)
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
同样的我们从以下几个方面分析源码:
- 被观察者Observable是何时创建
- 观察者Observer何时创建(很简单就一个接口,不多介绍)
- 被观察者与观察者如何建立subscribe订阅关系的
先来分析下一个参数的 just() 这个方法
1. 被观察者Observable是何时创建
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
//创建ObservableJust对象,封装成被观察者Observable
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
创建了 ObservableJust 对象,调用 RxJavaPlugins.onAssembly 返回了被观察者 Observable.
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
我们看下 ObservableJust 类,同样的也是继承 Observable。
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
//这个方法很重要, 等会会说到
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
这时候被观察者已经创建完成了,它就是 Observable 的子类 ObservableJust,我们在用 Observable.just() 的时候其实被观察者是 ObservableJust。
2. Observable 和 Observer 如何建立 subscribe 订阅关系的
public final void subscribe(Observer<? super T> observer) {
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
npe.initCause(e);
throw npe;
}
}
这里的 Subscribe() 方法和 Observable.Create() 调的 Subscribe() 一样,我们知道 Subscribeactual() 这个方法是抽象方法,那它具体实现是不是和 Observable.Create() 一样也是 Observablecreate 类里呢?不是,不是,不是,重要的话说三遍哦,它的具体实现在我们看到 Observablejust 中。
那么我们回到 ObservableJust 中看下subscribeActual(Observer<? super T> observer)
protected void subscribeActual(Observer<? super T> observer) {
//创建了一个线程
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
//设置observer的回调方法onSubscribe
observer.onSubscribe(sd);
//执行线程
sd.run();
}
这里面创建了一个线程(Runnable),他就是 ScalarDisposable ,ScalarDisposable实现Runnable,把我们创建的观察者Observer 和参数value (Observable.just("文章1") 这里的‘文章1’) 作为构造方法的参数传进去了,
同时他也是Disposable的子类,所以 observer.onSubscribe(sd);
这行就很好理解了,就是设置了观察者的onSubscribe方法的回调,所以观察者onSubscribe()是在订阅时被调用,也就是在事件执行之前调用。
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
......
接下来看下 ScalarDisposable 类
public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
private static final long serialVersionUID = 3880992722410194083L;
//我们创建的观察者
final Observer<? super T> observer;
//我们在just中传递的参数(文章1)
final T value;
static final int START = 0;
static final int FUSED = 1;
static final int ON_NEXT = 2;
static final int ON_COMPLETE = 3;
public ScalarDisposable(Observer<? super T> observer, T value) {
this.observer = observer;
this.value = value;
}
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
}
我们在subscribeActual()方法中看到最后执行了 sd.run(); 所以我们只需看 public void run() 这个方法, run()方法首先执行了 observer.onNext(value), 也就是说我们在创建Observable时传的参数此时发送给observer, 然后在执行observer.onComplete()。
这样,Observable.just() 一个参数的方法就结束了
在来看下多个参数的 just() 这个方法
同样的我们从以下几个方面分析源码:
1. 被观察者Observable是何时创建
我们下看源码:
public static <T> Observable<T> just(T item1, T item2) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");
return fromArray(item1, item2);
}
很简单,在往下看 fromArray()
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
我们看到fromArray()参数是 可变长度参数,也就是说参数可以为1个,当然了1个时直接调用Observable.just("文章1") 一个参数的just()。最后返回一个可变长度参数 items 构造的 ObservableFromArray 对象,他也继承了Observable,也就是说我们创建的被观察着就是 ObservableFromArray 对象。
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
......
2. 接下来我们看下subscribe方法
同样是
public final void subscribe(Observer<? super T> observer) {
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
npe.initCause(e);
throw npe;
}
}
但是抽象方法subscribeActual()在ObservableFromArray中执行 在看ObservableFromArray的subscribeActual()这个方法
@Override
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
此时创建了FromArrayDisposable对象,参数是我们创建的观察者observer,和传递的可变长度的参数array,然后执行Observer中的onSubscribe()方法。最后执行了FromArrayDisposable的run()方法(注意他不是线程的run()方法)。
我们在看FromArrayDisposable的run()方法
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> downstream;
final T[] array;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.downstream = actual;
this.array = array;
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
}
变量downstream 就是我们创建的观察者 Observer ,array 就是我们传递的可变长度的那一串数组
run()方法中遍历array,然后执行回调 downstream.onNext(value) ,最后在执行回调 downstream.onComplete()。
根据上面的分析,我们得出如下规则:
1、通过 just() 方式 直接触发 onNext()
2、just 传进去什么,在onNext() 接收什么,如果我们传入 List,同样的在 onNext() 接收的也是 List,而不是 List 的 Item
3、onNext() 中接收数据的顺序是根据 just 传入的顺序确认的,使用 just 不允许传递 null,否则会出现异常
RxJava2-from操作符使用和分析
欢迎转载,但是请注明出处
示例源码
个人Github主页
如果对您有帮助,您可以 "Star" 支持一下哦, 谢谢! ^^