拆轮子系列:RxJava

1,199 阅读10分钟
原文链接: www.jianshu.com

目录

整体思路

根据对RxJava使用的基本认识,个人觉得解析RxJava关键在于抓住以下几个问题:

  1. 事件流源头(observable)怎么发出数据
  2. 响应者(subscriber)怎么收到数据
  3. 操作符如何运作(operator/transformer)
  4. 整个过程的调度(scheduler)

需要说明的一点是,本文基于RxJava1.3.0,RxJava当前最新版本已经升级到了 2.2.4,后续会单开文章讲述版本之间的变化。

在具体讲述之前,先来介绍RxJava核心的三个类:

Observable

先来看一下源码中的说明:The Observable class that implements the Reactive Pattern.

它其实是一次观察者模式实现的调度者。所谓一个观察者模式在RxJava中指的是一次subscribe。

一次subscribe的实质可以抽象成下述代码,这个抽象很重要,后续的一系列变换都是基于这个抽象来做的:

public class Observable<T> {

    final OnSubscribe<T> onSubscribe;

    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        this.onSubscribe.call(subscriber);
    }
}

OnSubscribe

同样的,先来看一下OnSubscribe的官方说明:

/**
* Invoked when Observable.subscribe is called.
* @param <T> the output value type
*/
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
    // cover for generics insanity
}

public interface Action1<T> extends Action {
    void call(T t);
}

Subscriber

Subscriber是接口Observer的抽象子类,

public interface Observer<T> {

    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

RxJava应用及一次订阅的流程分析

我们先来看一下RxJava的一个基本示例,然后以此为引子,进行整个流程的追踪和分析.

这个过程很简单,通过Observable.just发射数据,经过一次map转换,经过subscribeOn、observeOn切换线程,最后通过subscribe实现订阅。

Observable
    .just("Observable.create! User Observable.just!")
    .map(new Func1<String, String>() {
        @Override
        public String call(String s) {
            return "Observable.create! User Observable.map!";
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
             Log.i("RxJava", "print message: " + s);
        }
    });

简单梳理一下整个过程的对象转换关系如下:

just

先来看一下just的调用过程

public static <T> Observable<T> just(final T value) {
    return ScalarSynchronousObservable.create(value);
}

public static <T> ScalarSynchronousObservable<T> create(T t) {
    return new ScalarSynchronousObservable<T>(t);
}

protected ScalarSynchronousObservable(final T t) {
    super(RxJavaHooks.onCreate(new JustOnSubscribe<T>(t)));
    this.t = t;
}

从代码中可以看出,其核心过程是:

  1. 我们创建的是 ScalarSynchronousObservable,一个 Observable 的子类;
  2. ScalarSynchronousObservable的构造函数中传入了一个JustOnSubscribe类,这是一个OnSubscribe的实现类。

这里我们可以这么理解,Observable的构造函数传入了一个OnSubscribe,这是一个回调,它有一个回调方法void call(T t); 这里我们先记住这个call回调,后面再把整个过程串起来。

看一下JustOnSubscribe的具体实现:

static final class JustOnSubscribe<T> implements OnSubscribe<T> {
    final T value;

    JustOnSubscribe(T value) {
        this.value = value;
    }

    @Override
    public void call(Subscriber<? super T> s) {
        s.setProducer(createProducer(s, value));
    }
}

static <T> Producer createProducer(Subscriber<? super T> s, T v) {
    // ...
    return new WeakSingleProducer<T>(s, v);
}

我们再来看WeakSingleProducer的源码,在request方法中,可以看到调用了onNext() 和 onComplete(),这样,just中的数据就被创造并传递出来了。

static final class WeakSingleProducer<T> implements Producer {
    // ...
    
    @Override
    public void request(long n) {
        // 省略状态检查代码
        Subscriber<? super T> a = actual;
        if (a.isUnsubscribed()) {
            return;
        }
        T v = value;
        try {
            a.onNext(v);
        } catch (Throwable e) {
            Exceptions.throwOrReport(e, a, v);
            return;
        }

        if (a.isUnsubscribed()) {
            return;
        }
        a.onCompleted();
    }
}

map

map它是一种转换,将上游输入的数据转换之后,传递到下游。

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    ...
    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }
}

OnSubscribeMap类是OnSubscribe的子类,unsafeCreate()方法就是通过传入的OnSubscribe构造一个Observable。这一点和just方法本质上是一样的,通过OnSubscribe构造一个Observable实例。所以Map的本质就是将一个Observable转换成另外一个Observable,期间会回调call方法。 那么,map的call方法具体做了什么呢?

  1. 创建了一个MapSubscriber;
  2. 将MapSubscriber加入到Subscriber的父链中;
  3. 修正订阅关系,source Observable订阅的是MapSubscriber,意思是在map之前订阅的是subscriberA,此时订阅的就是新的MapSubscriber,而MapSubscriber是subscriberA的parent,它们会有一个嵌套关系

MapSubscriber的源码如下,其过程还是比较直接的:

  1. 上游每新来一个数据,就用我们给的 mapper 进行数据转换。
  2. 再把转换之后的数据发送给下游。
static final class MapSubscriber<T, R> extends Subscriber<T> {

    ...
    public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

    @Override
    public void onNext(T t) {
        R result;

        try {
            result = mapper.call(t);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            unsubscribe();
            onError(OnErrorThrowable.addValueAsLastCause(ex, t));
            return;
        }

        actual.onNext(result);
    }  
}

subscribe

下面我们再来看subscribe的过程,这是Subscriber对OnSubscribe的订阅过程。

public final Subscription subscribe(final Action1<? super T> onNext) {
    // 省略参数检查代码
    Action1<Throwable> onError = 
        InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
    Action0 onCompleted = Actions.empty();
    return subscribe(new ActionSubscriber<T>(onNext, 
        onError, onCompleted));                             // 1
}

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

static <T> Subscription subscribe(Subscriber<? super T> subscriber, 
      Observable<T> observable) {
    // 省略参数检查代码
    subscriber.onStart();                                   // 2
    
    if (!(subscriber instanceof SafeSubscriber)) {
        subscriber = new SafeSubscriber<T>(subscriber);     // 3
    }

    try {
        RxJavaHooks.onObservableStart(observable, 
            observable.onSubscribe).call(subscriber);       // 4
        return RxJavaHooks.onObservableReturn(subscriber);  // 5
    } catch (Throwable e) {
        // 省略错误处理代码
    }
}
  1. 我们首先对传入的 Action 进行包装,包装为 ActionSubscriber,一个 Subscriber 的实现类。
  2. 调用 subscriber.onStart() 通知 subscriber 它已经和 observable 连接起来了。这里我们就知道,onStart() 就是在我们调用 subscribe() 的线程执行的。
  3. 如果传入的 subscriber 不是 SafeSubscriber,那就把它包装为一个SafeSubscriber。
  4. 我们跳过 hook,认为它什么也没做,那这里我们调用的其实是observable.onSubscribe.call(subscriber),这里我们就看到了前面提到的 onSubscribe 的使用代码,在我们调用 subscribe() 的线程执行这个回调。
  5. 跳过 hook,那么这里就是直接返回了subscriber, Subscriber继承了Subscription,用于取消订阅。

我们应该还记得OnSubscribeMap中的call方法吧,这里的observable.onSubscribe.call(subscriber)调用的就是OnSubscribeMap.call()方法。 在OnSubscribeMap.call()之中,有一段代码:source.unsafeSubscribe(parent);它会继续回溯去调用上一个observable.onSubscribe.call()的call方法,而这个call方法就是JustOnSubscribe中的call方法

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
           // 省略错误处理代码  
        }
            return Subscriptions.unsubscribed();
        }
    }

整个过程如下:


RxJava顺序流程.png

这里我们可以看到RxJava中存在这样一种嵌套关系:


RxJava回溯执行的过程.png

线程调度

前面的过程都是通过函数调用来完成的,都在subscribe所在的线程执行,RxJava进行异步非常简单,只需要使用 subscribeOn 和 observeOn 这两个操作符即可。既然它俩都是操作符,那流程上就是和 map 差不多的,这里我们主要关注线程调度的实现原理。subscribeOn和observeOn操作符的调用者是Observable<T>,方法参数是Scheduler,它们的区别是subscribeOn决定的是上游Observable的执行线程,observeOn决定的是下游的Subscriber回调执行的线程,下面我们来看具体是怎么实现的。

subscribeOn

追踪subscribeOn的调用过程,其调用过程通过OperatorSubscribeOn进行了一次转换。过程如下:

  1. 获取Scheduler中的Worker对象inner;
  2. 将Subscriber包装成SubscribeOnSubscriber,这个是parentSubcriber;
  3. inner.schedule(parent) 执行具体过程
  4. SubscribeOnSubscriber中的setProducer方法中,做了进一步的线程调度
  5. 如果当前是在同一个线程中,直接request;
  6. 如果不在同一个线程中,发生一次线程调度

那么,这两次调度有什么区别呢?简单的说:
inner.schedule(parent)调度影响的是Subscriber的回调,也就是下游的监听;
setProducer调度影响的是上游数据的request;
所以subscribeOn影响的是上下游的执行线程,下游如果要切换线程,需要通过observeOn进行切换

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;
    final boolean requestOn;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
        this.scheduler = scheduler;
        this.source = source;
        this.requestOn = requestOn;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();

        SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
        subscriber.add(parent);
        subscriber.add(inner);

        inner.schedule(parent);
    }

    ...
}

SubscribeOnSubscriber

static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {

    ...
    @Override
    public void setProducer(final Producer p) {
        actual.setProducer(new Producer() {
        @Override
        public void request(final long n) {
            if (t == Thread.currentThread() || !requestOn) {
                p.request(n);
            } else {
                worker.schedule(new Action0() {
                    @Override
                    public void call() {
                        p.request(n);
                    }
                });
            }
        }
        });
    }
}

observeOn

同样的,我们追踪observeOn。过程如下:

  1. 创建OperatorObserveOn,继承自Operator;
  2. 通过lift操作符进行切换
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, RxRingBuffer.SIZE);
}


public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

OnSubscribeLift

它的逻辑是先对下游 subscriber 用操作符进行处理,处理会返回一个新的subscriber,然后通知处理后的 subscriber,它将要和 observable 连接起来了,最后把它和上游连接起来。

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    @Override
    public void call(Subscriber<? super R> o) {
        ...
        Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
        st.onStart();
        parent.call(st);
        ...
    }
            
}

OperatorObserveOn

作为操作符的逻辑,也比较简单,如果 scheduler 是 ImmediateScheduler/TrampolineScheduler,就什么也不做,否则就把 subscriber 包装为 ObserveOnSubscriber,看来脏活累活都是 ObserveOnSubscriber 干的了。

public final class OperatorObserveOn<T> implements Operator<T, T> {
    // ...

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(
                scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }

    // ...
}

ObserveOnSubscriber

继续看ObserveOnSubscriber,它是observeOn只有生成的新的subscriber, 下面的源码是简化之后的实现。我们可以看到它调度了每个单独的subscriber.onXXX() 方法。所以这就是observeOn调度只影响subscriber的原因了!!!!

Observable.create(subscriber -> {
    Worker worker = scheduler.createWorker();
    subscriber.add(worker);
 
    source.unsafeSubscribe(new Subscriber<T>(subscriber) {
        @Override
        public void onNext(T t) {
            worker.schedule(() -> subscriber.onNext(t));
        }
         
        @Override
        public void onError(Throwable e) {
            worker.schedule(() -> subscriber.onError(e));
        }
 
        @Override
        public void onCompleted() {
            worker.schedule(() -> subscriber.onCompleted());
        }            
    });
});

RxJava应用举例

使用RxJava实现从DB load 数据

通过Observable 提供的系列create方法创建, create系列方法有:

  1. Observable<T> create(Action1<Emitter<T>> emitter, Emitter.BackpressureMode backpressure)

  2. Observable<T> unsafeCreate(OnSubscribe<T> f)

  3. Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe)

  4. Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe)

这里采用第二个方法创建:

Observable.unsafeCreate(new rx.Observable.OnSubscribe<Data>() {
            @Override
            public void call(Subscriber<? super Data> subscriber) {
                Data data = null;
                // sql操作,loadFromDB
                subscriber.onNext(data);
                subscriber.onCompleted();
            }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Action1<Data>() {
                @Override
                public void call(Data data) {
                    // handle data
                }
            });

使用RxJava实现分页数据加载

开发中,我们会遇到这样的场景,某个接口采用分页拉取方式,初始化时我们可能需要循环去拉,一次性把数据全部拉取到,假定你不能通过limit设置成无限大的方法拉取一次。这种场景,一般处理可能是循环迭代拉,如果采用RxJava则会非常方便。

protected void fetchPatients() {
        Observable observable = Observable.range(0, Integer.MAX_VALUE)
                .concatMap(new Func1<Integer, Observable<List<Data>>>() {
                    @Override
                    public Observable<List<Data>> call(Integer page) {
                        return getPageObservable(page);
                    }
                })
                .takeWhile(new Func1<List<Data>, Boolean>() {
                    @Override
                    public Boolean call(List<Data> data) {
                        return data.size() < FETCH_LIMIT;
                    }
                }).reduce(new ArrayList<Data>(), new Func2<ArrayList<Data>, List<Data>, ArrayList<Data>>() {
                    @Override
                    public ArrayList<Data> call(ArrayList<Data> datas, List<Data> datas2) {
                        datas.addAll(datas2);
                        return datas;
                    }
                })
                .map(new Func1<List<Data>, List<Data>>() {
                    @Override
                    public List<Data> call(List<Data> datas) {
                        // do some last handle
                        return datas;
                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.trampoline())
                .subscribe(new Action1() {
                    @Override
                    public void call(List<Data> datas) {
                        
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                       
                    }
                });
    }

    protected Observable getPageObservable(int page) {
        Observable observable = apiService.getPager(page, FETCH_LIMIT)
                .map(new Func1<List<Data>, List<Data>>() {
                    @Override
                    public List<Data> call(List<Data> datas) {
                        // do some pre handle
                        return datas;
                    }

                });
        return observable;
    }

总结

本文从最简单的用例出发,追踪了RxJava的完整过程,也响应了文章开头所提的四个步骤:

  1. 事件流源头(observable)怎么发出数据
  2. 响应者(subscriber)怎么收到数据
  3. 操作符如何运作(operator/transformer)
  4. 整个过程的调度(scheduler)

关于RxJava,还有两个核心的问题:

  1. RxJava调度器Scheduler
  2. RxJava中的背压概念

这两个问题我会在后续的文章中继续论述