JDK9 SubmissionPublisher源码解读:

781 阅读4分钟

JDK版本:13

public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
    if (consumer == null)
        throw new NullPointerException();
    CompletableFuture<Void> status = new CompletableFuture<>();
    //绑定Publisher、Subscriber,传入ConsumerSubscriber,包装一个Consumer
    subscribe(new ConsumerSubscriber<T>(status, consumer));
    return status;
}
public void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) throw new NullPointerException();
        //max默认取Flow.defaultBufferSize()=256,,,INITIAL_CAPACITY为2的指数倍,默认32,此时Object[32]
        int max = maxBufferCapacity; // allocate initial array
        Object[] array = new Object[max < INITIAL_CAPACITY ?
                                    max : INITIAL_CAPACITY];
    
    	//将Subscriber包装为一个BufferedSubscription
        BufferedSubscription<T> subscription =
            new BufferedSubscription<T>(subscriber, executor, onNextHandler,
                                        array, max);
        synchronized (this) {
            if (!subscribed) {
                //初次订阅subscribed设为true,拿到当前线程
                subscribed = true;
                owner = Thread.currentThread();
            }
            //下面整段for循环解读:初次订阅,clients指向上面new的BufferedSubscription,此时pred=null。后续订阅时,将上面new的BufferedSubscription放在链尾,并且会remove链表之前已经处于close状态的BufferedSubscription。之后执行subscription.onSubscribe();clients = subscription;break;退出循环。可以看到SubmissionPublisher对每一个Suscriber将其包装成一个独立的BufferedSubscription放入链表并执行其onSubscribe()方法。
            for (BufferedSubscription<T> b = clients, pred = null;;) {
                if (b == null) {
                    Throwable ex;
                    subscription.onSubscribe();
                    if ((ex = closedException) != null)
                        subscription.onError(ex);
                    else if (closed)
                        subscription.onComplete();
                    else if (pred == null)
                        clients = subscription;
                    else
                        pred.next = subscription;
                    break;
                }
                BufferedSubscription<T> next = b.next;
                if (b.isClosed()) {   // remove
                    b.next = null;    // detach
                    if (pred == null)
                        clients = next;
                    else
                        pred.next = next;
                }
                else if (subscriber.equals(b.subscriber)) {
                    b.onError(new IllegalStateException("Duplicate subscribe"));
                    break;
                }
                else
                    pred = b;
                b = next;
            }
        }
    }

再来看BufferedSubscription#onSubscribe():

final void onSubscribe() {
            startOnSignal(RUN | ACTIVE);
        }
//BufferedSubscription#onSubscribe()根据状态,执行BufferedSubscription#tryStart()
//将BufferedSubscription包装成ConsumerTask交给ForkJoinPool线程池执行BufferedSubscription#consume()方法。
final void tryStart() {
    try {
        Executor e;
        ConsumerTask<T> task = new ConsumerTask<T>(this);
        if ((e = executor) != null)   // skip if disabled on error
            e.execute(task);
    } catch (RuntimeException | Error ex) {
        getAndBitwiseOrCtl(ERROR | CLOSED);
        throw ex;
    }
}

再来看BufferedSubscription#consume():

final void consume() {
    Subscriber<? super T> s;
    if ((s = subscriber) != null) {          // hoist checks
        subscribeOnOpen(s);
        long d = demand;
        //for循环,一直从BufferedSubscription的Object[] array 中获取元素消费
        for (int h = head, t = tail;;) {
            int c, taken; boolean empty;
            if (((c = ctl) & ERROR) != 0) {
                closeOnError(s, null);
                break;
            }
            //
            else if ((taken = takeItems(s, d, h)) > 0) {
                head = h += taken;
                d = subtractDemand(taken);
            }
            else if ((d = demand) == 0L && (c & REQS) != 0)
                weakCasCtl(c, c & ~REQS);    // exhausted demand
            else if (d != 0L && (c & REQS) == 0)
                weakCasCtl(c, c | REQS);     // new demand
            else if (t == (t = tail)) {      // stability check
                if ((empty = (t == h)) && (c & COMPLETE) != 0) {
                    closeOnComplete(s);      // end of stream
                    break;
                }
                else if (empty || d == 0L) {
                    int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
                    if (weakCasCtl(c, c & ~bit) && bit == RUN)
                        break;               // un-keep-alive or exit
                }
            }
        }
    }
}

再来看BufferedSubscription#takeItems(Subscriber<? super T> s, long d, int h):

final int takeItems(Subscriber<? super T> s, long d, int h) {
            Object[] a;
            int k = 0, cap;
            if ((a = array) != null && (cap = a.length) > 0) {
                int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
                int n = (d < (long)b) ? (int)d : b;
                for (; k < n; ++h, ++k) {
                    //从array中获取索引为h的元素x
                    Object x = QA.getAndSet(a, h & m, null);
                    if (waiting != 0)
                        signalWaiter();
                    if (x == null)
                        break;
                    //ConsumerSubscriber消费元素x
                    else if (!consumeNext(s, x))
                        break;
                }
            }
            return k;
}

final boolean consumeNext(Subscriber<? super T> s, Object x) {
    try {
        @SuppressWarnings("unchecked") T y = (T) x;
        if (s != null)
            s.onNext(y);
        return true;
    } catch (Throwable ex) {
        handleOnNext(s, ex);
        return false;
    }
}

//调用consumer.accept(item)消费元素
public final void onNext(T item) {
    try {
        consumer.accept(item);
    } catch (Throwable ex) {
        subscription.cancel();
        status.completeExceptionally(ex);
    }
}

接着来看往BufferedSubscription维护的数组Object[]中放入元素的submit(T item)方法:

public int submit(T item) {
    return doOffer(item, Long.MAX_VALUE, null);
}

private int doOffer(T item, long nanos,
                    BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
    if (item == null) throw new NullPointerException();
    int lag = 0;
    boolean complete, unowned;
    synchronized (this) {
        Thread t = Thread.currentThread(), o;
        BufferedSubscription<T> b = clients;
        if ((unowned = ((o = owner) != t)) && o != null)
            owner = null;                     // disable bias
        if (b == null)
            complete = closed;
        else {
            complete = false;
            boolean cleanMe = false;
            BufferedSubscription<T> retries = null, rtail = null, next;
            do {
                next = b.next;
                //
                int stat = b.offer(item, unowned);
                if (stat == 0) {              // saturated; add to retry list
                    b.nextRetry = null;       // avoid garbage on exceptions
                    if (rtail == null)
                        retries = b;
                    else
                        rtail.nextRetry = b;
                    rtail = b;
                }
                else if (stat < 0)            // closed
                    cleanMe = true;           // remove later
                else if (stat > lag)
                    lag = stat;
            } while ((b = next) != null);

            //重试
            if (retries != null || cleanMe)
                lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
        }
    }
    if (complete)
        throw new IllegalStateException("Closed");
    else
        return lag;
}


final int offer(T item, boolean unowned) {
    Object[] a;
    int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
    int t = tail, i = t & (cap - 1), n = t + 1 - head;
    if (cap > 0) {
        boolean added;
        if (n >= cap && cap < maxCapacity) // resize
            //将item放入Object[] a
            added = growAndOffer(item, a, t);
        else if (n >= cap || unowned)      // need volatile CAS
            added = QA.compareAndSet(a, i, null, item);
        else {                             // can use release mode
            QA.setRelease(a, i, item);
            added = true;
        }
        if (added) {
            tail = t + 1;
            stat = n;
        }
    }
    return startOnOffer(stat);
}

总结:JDK9提供的响应式api SubmissionPublisher内部维护了ConsumerSubscriber、BufferedSubscription。通过submit方法产生元素并缓存在每个订阅Subscriber独立的BufferedSubscription中的缓冲buffer。通过consume(Consumer consumer)方法,将consumer包装成内部的ConsumerSubscriber,并调用subscribe(),通过创建独立的BufferedSubscription将Publisher、Subscriber建立关系消费元素。如何实现的呢?BufferedSubscription通过调用onSubscribe()将消费元素的动作(从BufferedSubscription中的缓冲buffer中获取元素消费)包装成SubmissionPublisher内部的ConsumerTask对象,并交给ForkJoinPool线程池(也可以自指定)执行。