RxJava2 Flowable源码浅析

1,359 阅读7分钟

关于Observable的源码解析可以看Rxjava2 Observable源码浅析

关于Subject的源码解析可以看RxJava2 Subject源码浅析

前言

看过Rxjava2 Observable源码浅析的你会发现其实Rxjava的实现套路都差不多,所以其实Flowable也差不多,只是在实现的细节上稍微有些差异而已。

背景

Flowable的出现其实主要是为了解决在异步模型中上下游数据发送和接收的差异性而存在的。上游发送速度大于下游接收速度时就会产生数据积压导致OOM,而Flowable就提供了背压(BackPressure) 策略来处理数据积压问题。

流程

从最原始的Flowable#create开始

//FlowableOnSubscribe就是最原始的数据源发生器
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
    ObjectHelper.requireNonNull(source, "source is null");
    ObjectHelper.requireNonNull(mode, "mode is null");
    //将FlowableOnSubscribe转化成了FlowableCreate
    return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}

可以看到create方法也是将数据源进行了一层封装。而subscribe方法和Observable#subscribe就是差不多,最终还是调用的Flowable#subscribeActual,而这里就是FlowableCreate#subscribeActual

public final class FlowableCreate<T> extends Flowable<T> {

    final FlowableOnSubscribe<T> source;

    final BackpressureStrategy backpressure;

    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }

    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        //根据不同的背压策略实现不同Emitter
        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }
        //一般来说在Subscriber#onSubscribe,调用emitter.request指定拉取上游多少数据
        t.onSubscribe(emitter);
        try {
            //将上下游关联
            //调用Flowable#create一开始创建的FlowableOnSubscribe#subscribe
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
}

可以才看到这里核心就是根据不用背压策略实现不同的Emitter。一般来说在Subscriber#onSubscribe,调用emitter.request指定拉取上游多少数据,从而通过背压策略对数据下发的策略不同。

BaseEmitter

    abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        private static final long serialVersionUID = 7326289992464377023L;

        final Subscriber<? super T> actual;

        final SequentialDisposable serial;

        BaseEmitter(Subscriber<? super T> actual) {
            this.actual = actual;
            this.serial = new SequentialDisposable();
        }

        @Override
        public void onComplete() {
            complete();
        }

        protected void complete() {
            if (isCancelled()) {
                return;
            }
            try {
                actual.onComplete();
            } finally {
                serial.dispose();
            }
        }

        @Override
        public final void onError(Throwable e) {
            //尝试下发完成缓存数据
            if (!tryOnError(e)) {
                RxJavaPlugins.onError(e);
            }
        }

        @Override
        public boolean tryOnError(Throwable e) {
            return error(e);
        }

        protected boolean error(Throwable e) {
            if (e == null) {
                e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (isCancelled()) {
                return false;
            }
            try {
                actual.onError(e);
            } finally {
                serial.dispose();
            }
            return true;
        }

        @Override
        public final void cancel() {
            serial.dispose();
            onUnsubscribed();
        }

        @Override
        public final boolean isCancelled() {
            return serial.isDisposed();
        }

        @Override
        public final void request(long n) {
            //记录请求的个数
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }

        void onRequested() {
            // default is no-op
        }

        @Override
        public final void setDisposable(Disposable s) {
            serial.update(s);
        }

        @Override
        public final void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public final long requested() {
            return get();
        }
        .....
    }

这里可以看到BaseEmitter通过自身继承AtomicLong取记录请求个数,而不是通过锁或者volatile来提高性能。

MissingEmitter - BackpressureStrategy.MISSING

不做任何处理,由下游自行处理overflow。MissingEmitter实现很简单。

    static final class MissingEmitter<T> extends BaseEmitter<T> {

        private static final long serialVersionUID = 3776720187248809713L;

        MissingEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        public void onNext(T t) {
            if (isCancelled()) {
                return;
            }
            //这里可以看出,对应数据下发没有任何限制
            if (t != null) {
                actual.onNext(t);
            } else {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //request减1
            for (;;) {
                long r = get();
                if (r == 0L || compareAndSet(r, r - 1)) {
                    return;
                }
            }
        }

    }

BufferAsyncEmitter - BackpressureStrategy.BUFFER

static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {

    private static final long serialVersionUID = 2427151001689639875L;

    final SpscLinkedArrayQueue<T> queue;///数据缓存列表

    Throwable error;
    volatile boolean done;//标记是否onComplete或onError

    final AtomicInteger wip;//标记调用了多少次drain

    BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
        super(actual);
        this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
        this.wip = new AtomicInteger();
    }

    @Override
    public void onNext(T t) {
        if (done || isCancelled()) {
            return;
        }

        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        queue.offer(t);///数据入队列
        drain();//检测并下发数据
    }

    @Override
    public boolean tryOnError(Throwable e) {
        if (done || isCancelled()) {
            return false;
        }

        if (e == null) {
            e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }

        error = e;
        done = true;//标记完成
        drain();//检测并下发未完成数据
        return true;
    }

    @Override
    public void onComplete() {//仅标记,若队列有数据继续下发完成
        done = true;//标记完成
        drain();//检测并下发未完成数据
    }

    @Override
    void onRequested() {//#request(long n)后调用
        drain();//检测并下发数据
    }

    @Override
    void onUnsubscribed() {
        if (wip.getAndIncrement() == 0) {
            queue.clear();
        }
    }

    void drain() {
        //类似于if(wip++ != 0)
        //所以这里多次调用#drain只有第一次调用才会通过,或者已经清空队列等待一下调用#drain
        if (wip.getAndIncrement() != 0) {
            return;
        }

        int missed = 1;
        final Subscriber<? super T> a = actual;
        final SpscLinkedArrayQueue<T> q = queue;

        for (; ; ) {
            long r = get();//数据请求数,由#request决定
            long e = 0L;

            while (e != r) {
                if (isCancelled()) {
                    q.clear();
                    return;
                }
                //是否已完成,调用onComplete/onError后会标记done==true
                boolean d = done;
                //获取队列第一条数据
                T o = q.poll();
                //用于标记队列是否为空
                boolean empty = o == null;
                //已标记完成且队列为空,调用onComplete/onError
                if (d && empty) {
                    Throwable ex = error;
                    if (ex != null) {
                        error(ex);
                    } else {
                        complete();
                    }
                    return;
                }
                //队列为空,退出获取数据循环
                if (empty) {
                    break;
                }
                //下发数据
                a.onNext(o);
                //标记已下发数据
                e++;
            }
            //数据下发量和请求数相符
            if (e == r) {
                if (isCancelled()) {
                    q.clear();
                    return;
                }
                //标记是否完成
                boolean d = done;
                //标记队列是否为空
                boolean empty = q.isEmpty();
                //队列为空且已完成,调用onComplete/onError
                if (d && empty) {
                    Throwable ex = error;
                    if (ex != null) {
                        error(ex);
                    } else {
                        complete();
                    }
                    return;
                }
            }
            //request数减去已经下发数
            if (e != 0) {
                BackpressureHelper.produced(this, e);
            }
            //已处理一次drain,wip-missed避免错过多次调用drain
            //和Observable#observeOn时的ObserveOnObserver#drainNormal处理方式一样
            missed = wip.addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
}

这里的#drain下发数据方法和Observable#observeOn->ObserveOnObserver#drainNormal的处理方式是有点相似的。通过本身记录request数和wip协调下发数据量及正确的下发。在调用Subscriber#onSubscribeEmitter#onNextEmitter#onComplete都会触发#drain尝试去下发缓存的数据。其中Emitter#onNext时先缓存数据在尝试下发,而且数据还没下发完成前调用onCompleteonError(这里重写了tryOnError)仅先标记完成,还要等数据完全下发才会真正调用actual对应方法。

其实这里我们还是可以学到一些东西的:

  • 如果可以的话,使用Atomic包下的类代替volatile和锁提高性能
  • 使用missedwip来协调多线程分发任务
  • 多线程中标志位的判断最好通过临时变量存储判断并多次判断

LatestAsyncEmitter - BackpressureStrategy.LATEST

BackpressureStrategy.LATEST当数据背压时只会缓存最后一次下发的数据(通过AtomicReference来缓存)。具体实现原理和BackpressureStrategy.BUFFER较为类似就不贴代码了。

BackpressureStrategy.DROP & BackpressureStrategy.ERROR

abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

        private static final long serialVersionUID = 4127754106204442833L;

        NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //是否已达请求数
            if (get() != 0) {
                actual.onNext(t);//未达请求数,下发
                BackpressureHelper.produced(this, 1);//请求数减1
            } else {
                onOverflow();//已超过请求,调用对应策略方法
            }
        }
        //
        abstract void onOverflow();
}

BackpressureStrategy.DROP对应的DropAsyncEmitterBackpressureStrategy.ERROR对应的ErrorAsyncEmitter都是继承于NoOverflowBaseAsyncEmitter。实现方式也是很简单,仅仅在onNext判断一下是否已经到达了请求数,未到达就下发,若到达了调用onOverflow()处理溢出方案。

BackpressureStrategy.DROP的溢出方案为空实现即舍去溢出数据 BackpressureStrategy.ERROR的溢出方案为调用onError即溢出时报错

总结

MISS策略需要下游自行处理背压问题

BUFFER策略则在还有数据未下发完成时就算上游调用onCompleteonError也会等待数据下发完成

LATEST策略则当产生背压时仅会缓存最新的数据

DROP策略为背压时抛弃背压数据

ERROR策略是背压时抛出异常调用onError

在学习源码时得到的一些关于多线程的领悟:

  • 如果可以的话,使用Atomic包下的类代替volatile和锁提高性能
  • 使用missedwip来协调多线程分发任务
  • 多线程中标志位的判断最好通过临时变量存储判断并多次判断