RxJava2-从源码角度分析just

570 阅读5分钟

just()示例

    Observable.just("文章1", "文章2")
              .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, " onSubscribe : " + d.isDisposed());
                    }
                    @Override
                    public void onNext(String value) {
                        Log.d(TAG, " onNext : " + value);
                    }
                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, " onError : " + e.getMessage());
                    }
                    @Override
                    public void onComplete() {
                        Log.d(TAG, " onComplete");
                    }
               })

Observable 的just()有10个重载方法,参数1~10个

public static <T> Observable<T> just(T item)
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

同样的我们从以下几个方面分析源码:

  • 被观察者Observable是何时创建
  • 观察者Observer何时创建(很简单就一个接口,不多介绍)
  • 被观察者与观察者如何建立subscribe订阅关系的

先来分析下一个参数的 just() 这个方法

1. 被观察者Observable是何时创建
 public static <T> Observable<T> just(T item) {
     ObjectHelper.requireNonNull(item, "The item is null");
	//创建ObservableJust对象,封装成被观察者Observable
     return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

创建了 ObservableJust 对象,调用 RxJavaPlugins.onAssembly 返回了被观察者 Observable.

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

我们看下 ObservableJust 类,同样的也是继承 Observable。

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }
	//这个方法很重要, 等会会说到
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}

这时候被观察者已经创建完成了,它就是 Observable 的子类 ObservableJust,我们在用 Observable.just() 的时候其实被观察者是 ObservableJust。

2. Observable 和 Observer 如何建立 subscribe 订阅关系的
public final void subscribe(Observer<? super T> observer) {
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);

        npe.initCause(e);
        throw npe;
    }
}

这里的 Subscribe() 方法和 Observable.Create() 调的 Subscribe() 一样,我们知道 Subscribeactual() 这个方法是抽象方法,那它具体实现是不是和 Observable.Create() 一样也是 Observablecreate 类里呢?不是,不是,不是,重要的话说三遍哦,它的具体实现在我们看到 Observablejust 中。

那么我们回到 ObservableJust 中看下subscribeActual(Observer<? super T> observer)

protected void subscribeActual(Observer<? super T> observer) {
	//创建了一个线程
    ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
	//设置observer的回调方法onSubscribe
    observer.onSubscribe(sd);
	//执行线程
    sd.run();
}

这里面创建了一个线程(Runnable),他就是 ScalarDisposable ,ScalarDisposable实现Runnable,把我们创建的观察者Observer 和参数value (Observable.just("文章1") 这里的‘文章1’) 作为构造方法的参数传进去了,

同时他也是Disposable的子类,所以 observer.onSubscribe(sd); 这行就很好理解了,就是设置了观察者的onSubscribe方法的回调,所以观察者onSubscribe()是在订阅时被调用,也就是在事件执行之前调用。

	Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

     ......

接下来看下 ScalarDisposable 类

public static final class ScalarDisposable<T> extends AtomicInteger  implements QueueDisposable<T>, Runnable {

    private static final long serialVersionUID = 3880992722410194083L;
	//我们创建的观察者
    final Observer<? super T> observer;
	//我们在just中传递的参数(文章1)
    final T value;

    static final int START = 0;
    static final int FUSED = 1;
    static final int ON_NEXT = 2;
    static final int ON_COMPLETE = 3;

    public ScalarDisposable(Observer<? super T> observer, T value) {
        this.observer = observer;
        this.value = value;
    }


    @Override
    public void run() {
        if (get() == START && compareAndSet(START, ON_NEXT)) {
            observer.onNext(value);
            if (get() == ON_NEXT) {
                lazySet(ON_COMPLETE);
                observer.onComplete();
            }
        }
    }
}

我们在subscribeActual()方法中看到最后执行了 sd.run(); 所以我们只需看 public void run() 这个方法, run()方法首先执行了 observer.onNext(value), 也就是说我们在创建Observable时传的参数此时发送给observer, 然后在执行observer.onComplete()。

这样,Observable.just() 一个参数的方法就结束了

在来看下多个参数的 just() 这个方法

同样的我们从以下几个方面分析源码:

1. 被观察者Observable是何时创建

我们下看源码:

public static <T> Observable<T> just(T item1, T item2) {
    ObjectHelper.requireNonNull(item1, "The first item is null");
    ObjectHelper.requireNonNull(item2, "The second item is null");

    return fromArray(item1, item2);
}

很简单,在往下看 fromArray()

public static <T> Observable<T> fromArray(T... items) {
    ObjectHelper.requireNonNull(items, "items is null");
    if (items.length == 0) {
        return empty();
    } else
    if (items.length == 1) {
        return just(items[0]);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}

我们看到fromArray()参数是 可变长度参数,也就是说参数可以为1个,当然了1个时直接调用Observable.just("文章1") 一个参数的just()。最后返回一个可变长度参数 items 构造的 ObservableFromArray 对象,他也继承了Observable,也就是说我们创建的被观察着就是 ObservableFromArray 对象。

public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }

    @Override
    public void subscribeActual(Observer<? super T> observer) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);

        observer.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }
	......

2. 接下来我们看下subscribe方法

同样是

public final void subscribe(Observer<? super T> observer) {
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);

        npe.initCause(e);
        throw npe;
    }
}

但是抽象方法subscribeActual()在ObservableFromArray中执行 在看ObservableFromArray的subscribeActual()这个方法

@Override
public void subscribeActual(Observer<? super T> observer) {
    FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);

    observer.onSubscribe(d);

    if (d.fusionMode) {
        return;
    }

    d.run();
}

此时创建了FromArrayDisposable对象,参数是我们创建的观察者observer,和传递的可变长度的参数array,然后执行Observer中的onSubscribe()方法。最后执行了FromArrayDisposable的run()方法(注意他不是线程的run()方法)。

我们在看FromArrayDisposable的run()方法

static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {

    final Observer<? super T> downstream;

    final T[] array;

    FromArrayDisposable(Observer<? super T> actual, T[] array) {
        this.downstream = actual;
        this.array = array;
    }

    void run() {
        T[] a = array;
        int n = a.length;

        for (int i = 0; i < n && !isDisposed(); i++) {
            T value = a[i];
            if (value == null) {
                downstream.onError(new NullPointerException("The " + i + "th element is null"));
                return;
            }
            downstream.onNext(value);
        }
        if (!isDisposed()) {
            downstream.onComplete();
        }
    }
}

变量downstream 就是我们创建的观察者 Observer ,array 就是我们传递的可变长度的那一串数组
run()方法中遍历array,然后执行回调 downstream.onNext(value) ,最后在执行回调 downstream.onComplete()。

根据上面的分析,我们得出如下规则:

1、通过 just() 方式 直接触发 onNext()

2、just 传进去什么,在onNext() 接收什么,如果我们传入 List,同样的在 onNext() 接收的也是 List,而不是 List 的 Item

3、onNext() 中接收数据的顺序是根据 just 传入的顺序确认的,使用 just 不允许传递 null,否则会出现异常



RxJava2-from操作符使用和分析

欢迎转载,但是请注明出处

示例源码
个人Github主页

如果对您有帮助,您可以 "Star" 支持一下哦, 谢谢! ^^