阅读 166

ObservableReplay与PublishSubject在一起所产生的化学效应

本文作为《Java编程方法论:响应式Rxjava与代码设计实战》一书第二章 Rxjava中的Subject一节的补充解读。

首先来看一个Demo:

@Test
    void replay_PublishSubject_test() {
        PublishSubject<Object> publishSubject = PublishSubject.create();
        ConnectableObservable<Object> replay = publishSubject.replay();
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        List<Integer> integers = new ArrayList<>();
        for (int i=1;i<10;i++){
            integers.add(i);
        }
        Disposable subscribe1 = replay.subscribe(x -> {
            log("一郎神: " + x);
        }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));

        Disposable subscribe2 = replay.subscribe(x -> {
            log("二郎神: " + x);
        }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        Disposable subscribe3 = replay.subscribe(x -> {
            log("三郎神: " + x);
        }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        AtomicInteger atomicInteger = new AtomicInteger(integers.size());
        try {
            forkJoinPool.submit(() -> {
                integers.forEach(id -> {
                    sleep(1,TimeUnit.SECONDS);
                    publishSubject.onNext(id);
                    if (atomicInteger.decrementAndGet() == 0) {
                        publishSubject.onComplete();
                    }
                });
            });
          
            replay.connect();
            sleep(2,TimeUnit.SECONDS);
            subscribe1.dispose();
            sleep(1,TimeUnit.SECONDS);
            //replay.connect(consumer -> consumer.dispose());
            publishSubject.onComplete();
            System.out.println("test");

        } finally  {
            try {
                forkJoinPool.shutdown();
                int shutdownDelaySec = 2;
                System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
                forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
            } catch (Exception ex) {
                System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
            } finally {
                System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
                List<Runnable> l = forkJoinPool.shutdownNow();
                System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
            }
        }
    }
复制代码

得到的结果如下所示:

ForkJoinPool.commonPool-worker-3: 一郎神: 1
ForkJoinPool.commonPool-worker-3: 二郎神: 1
ForkJoinPool.commonPool-worker-3: 三郎神: 1
ForkJoinPool.commonPool-worker-3: 二郎神: 2
ForkJoinPool.commonPool-worker-3: 三郎神: 2
Emission completed
Emission completed
test
………………等待 2 秒后结束服务……………… 
调用 forkJoinPool.shutdownNow()结束服务...
还剩 0 个任务等待被执行,服务已关闭 
复制代码

在调用subscribe1.dispose()的时候,完成了订阅者自行解除订阅关系的约定,而假如后面调用的是replay.connect(consumer -> consumer.dispose()),依然会在发送元素的过程中强行中断,不带任何通知。而在使用publishSubject.onComplete()后,则可以很优雅地通知后续订阅者优雅地结束。 如图2-3所示,我们按照图中文字操作,并在System.out.println("test")这行打断点查看状态,发现其他2个订阅者并没有被移除,为什么会出现这种情况?

通过publishSubject.replay(),我们得到了一个ConnectableObservable对象,具体如下:

//io.reactivex.Observable#replay
public final ConnectableObservable<T> replay() {
    return ObservableReplay.createFrom(this);
}
复制代码

结合前面ConnectableObservable相关知识的学习,在调用replay.subscribe(...)时,会将下游的订阅者与DEFAULT_UNBOUNDED_FACTORY所得到的UnboundedReplayBuffer对象通过一个ReplayObserver对象建立起联系:

//ObservableReplay#createFrom
public static <T> ConnectableObservable<T> createFrom(ObservableSource<? extends T> source) {
    return create(source, DEFAULT_UNBOUNDED_FACTORY);
}
//ObservableReplay#create
static <T> ConnectableObservable<T> create(ObservableSource<T> source,
        final BufferSupplier<T> bufferFactory) {
    // the current connection to source needs to be shared between the operator and its onSubscribe call
    final AtomicReference<ReplayObserver<T>> curr = new AtomicReference<ReplayObserver<T>>();
    //注意此处
    ObservableSource<T> onSubscribe = new ReplaySource<T>(curr, bufferFactory);
    //此处这个curr会作为ObservableReplay下current字段的值,记住,它是个引用类型对象
    return RxJavaPlugins.onAssembly(new ObservableReplay<T>(onSubscribe, source, curr, bufferFactory));
}
//ObservableReplay#subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
    onSubscribe.subscribe(observer);
}
//ObservableReplay.ReplaySource#subscribe
public void subscribe(Observer<? super T> child) {
        for (;;) {
            ReplayObserver<T> r = curr.get();
            if (r == null) {
                ReplayBuffer<T> buf = bufferFactory.call();
                ReplayObserver<T> u = new ReplayObserver<T>(buf);
                //此时ObservableReplay中current字段的值所指对象也会发生改变
                if (!curr.compareAndSet(null, u)) {
                    continue;
                }
                r = u;
            }
            InnerDisposable<T> inner = new InnerDisposable<T>(r, child);
            child.onSubscribe(inner);
            //通过ReplayObserver的observers字段将下游订阅者管理起来
            r.add(inner);
            if (inner.isDisposed()) {
                r.remove(inner);
                return;
            }
            //此处UnboundedReplayBuffer对象与下游订阅者建立联系
            r.buffer.replay(inner);
            break; 
        }
    }
}
复制代码

当调用replay.connect(consumer -> consumer.dispose())时,通过current获取上面得到的ReplayObserver对象,并调用该对象的dispose()方法(由replay.connect(...)中传入的Consumer实现可得),此时会将ObservableReplay中的observers字段设定为TERMINATED,同时将ObservableReplay自身身为AtomicReference角色所存储值设定为DISPOSED,即将ObservableReplaycurrent的值设定为了DISPOSED

//ObservableReplay#connect
public void connect(Consumer<? super Disposable> connection) {
    boolean doConnect;
    ReplayObserver<T> ps;
    for (;;) {
        ps = current.get();
        if (ps == null || ps.isDisposed()) {
            ReplayBuffer<T> buf = bufferFactory.call();
            ReplayObserver<T> u = new ReplayObserver<T>(buf);
            if (!current.compareAndSet(ps, u)) {
                continue;
            }
            ps = u;
        }
        doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true);
        break; 
    }
    
    try {
        connection.accept(ps);
    } catch (Throwable ex) {
        if (doConnect) {
            ps.shouldConnect.compareAndSet(true, false);
        }
        Exceptions.throwIfFatal(ex);
        throw ExceptionHelper.wrapOrThrow(ex);
    }
    if (doConnect) {
        source.subscribe(ps);
    }
}
//ObservableReplay.ReplayObserver#dispose
public void dispose() {
    observers.set(TERMINATED);
    DisposableHelper.dispose(this);
}
//DisposableHelper#dispose
public static boolean dispose(AtomicReference<Disposable> field) {
    Disposable current = field.get();
    Disposable d = DISPOSED;
    if (current != d) {
        current = field.getAndSet(d);
        if (current != d) {
            if (current != null) {
                current.dispose();
            }
            return true;
        }
    }
    return false;
}
复制代码

可以看到,ReplayObserver只是解除了与下游订阅者的关系,但并没有进一步对下游订阅者进行结束的操作,这样与UnboundedReplayBuffer对象建立联系的订阅者,如果buffer中的元素还未消费完毕,会持续消费直至所存元素下发完毕,但要注意的是,该buffer中并未存放结束事件(即通过调用UnboundedReplayBuffer#complete往该队列中存放NotificationLite.complete()元素)。同时下游订阅者也并未调用dispose()方法,所以下面所示源码中的output.isDisposed()结果为false。请注意下面所示源码中<1>处的代码:

public void replay(InnerDisposable<T> output) {
        if (output.getAndIncrement() != 0) {
            return;
        }

        final Observer<? super T> child = output.child;

        int missed = 1;

        for (;;) {
            if (output.isDisposed()) {
                return;
            }
            int sourceIndex = size;

            Integer destinationIndexObject = output.index();
            int destinationIndex = destinationIndexObject != null ? destinationIndexObject : 0;

            while (destinationIndex < sourceIndex) {
                Object o = get(destinationIndex);
                //此处很关键
                if (NotificationLite.accept(o, child)) {//<1>
                    return;
                }
                if (output.isDisposed()) {
                    return;
                }
                destinationIndex++;
            }

            output.index = destinationIndex;
            missed = output.addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
}
//io.reactivex.internal.util.NotificationLite#accept
public static <T> boolean accept(Object o, Observer<? super T> s) {
    if (o == COMPLETE) {
        s.onComplete();
        return true;
    } else
    if (o instanceof ErrorNotification) {
        s.onError(((ErrorNotification)o).e);
        return true;
    }
    s.onNext((T)o);
    return false;
}
复制代码

如果调用了UnboundedReplayBuffer#complete,那么在元素下发到最后时,就会出现o == COMPLETEtrue,此时会调用下游订阅者的onComplete()方法。

//ObservableReplay.UnboundedReplayBuffer#complete
public void onComplete() {
    if (!done) {
        done = true;
        buffer.complete();
        replayFinal();
    }
}
//ObservableReplay.UnboundedReplayBuffer#complete
public void complete() {
    add(NotificationLite.complete());
    size++;
}
//io.reactivex.internal.util.NotificationLite#complete
public static Object complete() {
    return COMPLETE;
}
复制代码

至此,关于replay_PublishSubject_test()示例中所展现的疑点已经解读完毕。