JDK版本:13
public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
if (consumer == null)
throw new NullPointerException();
CompletableFuture<Void> status = new CompletableFuture<>();
//绑定Publisher、Subscriber,传入ConsumerSubscriber,包装一个Consumer
subscribe(new ConsumerSubscriber<T>(status, consumer));
return status;
}
public void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
//max默认取Flow.defaultBufferSize()=256,,,INITIAL_CAPACITY为2的指数倍,默认32,此时Object[32]
int max = maxBufferCapacity; // allocate initial array
Object[] array = new Object[max < INITIAL_CAPACITY ?
max : INITIAL_CAPACITY];
//将Subscriber包装为一个BufferedSubscription
BufferedSubscription<T> subscription =
new BufferedSubscription<T>(subscriber, executor, onNextHandler,
array, max);
synchronized (this) {
if (!subscribed) {
//初次订阅subscribed设为true,拿到当前线程
subscribed = true;
owner = Thread.currentThread();
}
//下面整段for循环解读:初次订阅,clients指向上面new的BufferedSubscription,此时pred=null。后续订阅时,将上面new的BufferedSubscription放在链尾,并且会remove链表之前已经处于close状态的BufferedSubscription。之后执行subscription.onSubscribe();clients = subscription;break;退出循环。可以看到SubmissionPublisher对每一个Suscriber将其包装成一个独立的BufferedSubscription放入链表并执行其onSubscribe()方法。
for (BufferedSubscription<T> b = clients, pred = null;;) {
if (b == null) {
Throwable ex;
subscription.onSubscribe();
if ((ex = closedException) != null)
subscription.onError(ex);
else if (closed)
subscription.onComplete();
else if (pred == null)
clients = subscription;
else
pred.next = subscription;
break;
}
BufferedSubscription<T> next = b.next;
if (b.isClosed()) { // remove
b.next = null; // detach
if (pred == null)
clients = next;
else
pred.next = next;
}
else if (subscriber.equals(b.subscriber)) {
b.onError(new IllegalStateException("Duplicate subscribe"));
break;
}
else
pred = b;
b = next;
}
}
}
再来看BufferedSubscription#onSubscribe():
final void onSubscribe() {
startOnSignal(RUN | ACTIVE);
}
//BufferedSubscription#onSubscribe()根据状态,执行BufferedSubscription#tryStart()
//将BufferedSubscription包装成ConsumerTask交给ForkJoinPool线程池执行BufferedSubscription#consume()方法。
final void tryStart() {
try {
Executor e;
ConsumerTask<T> task = new ConsumerTask<T>(this);
if ((e = executor) != null) // skip if disabled on error
e.execute(task);
} catch (RuntimeException | Error ex) {
getAndBitwiseOrCtl(ERROR | CLOSED);
throw ex;
}
}
再来看BufferedSubscription#consume():
final void consume() {
Subscriber<? super T> s;
if ((s = subscriber) != null) { // hoist checks
subscribeOnOpen(s);
long d = demand;
//for循环,一直从BufferedSubscription的Object[] array 中获取元素消费
for (int h = head, t = tail;;) {
int c, taken; boolean empty;
if (((c = ctl) & ERROR) != 0) {
closeOnError(s, null);
break;
}
//
else if ((taken = takeItems(s, d, h)) > 0) {
head = h += taken;
d = subtractDemand(taken);
}
else if ((d = demand) == 0L && (c & REQS) != 0)
weakCasCtl(c, c & ~REQS); // exhausted demand
else if (d != 0L && (c & REQS) == 0)
weakCasCtl(c, c | REQS); // new demand
else if (t == (t = tail)) { // stability check
if ((empty = (t == h)) && (c & COMPLETE) != 0) {
closeOnComplete(s); // end of stream
break;
}
else if (empty || d == 0L) {
int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
if (weakCasCtl(c, c & ~bit) && bit == RUN)
break; // un-keep-alive or exit
}
}
}
}
}
再来看BufferedSubscription#takeItems(Subscriber<? super T> s, long d, int h):
final int takeItems(Subscriber<? super T> s, long d, int h) {
Object[] a;
int k = 0, cap;
if ((a = array) != null && (cap = a.length) > 0) {
int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
int n = (d < (long)b) ? (int)d : b;
for (; k < n; ++h, ++k) {
//从array中获取索引为h的元素x
Object x = QA.getAndSet(a, h & m, null);
if (waiting != 0)
signalWaiter();
if (x == null)
break;
//ConsumerSubscriber消费元素x
else if (!consumeNext(s, x))
break;
}
}
return k;
}
final boolean consumeNext(Subscriber<? super T> s, Object x) {
try {
@SuppressWarnings("unchecked") T y = (T) x;
if (s != null)
s.onNext(y);
return true;
} catch (Throwable ex) {
handleOnNext(s, ex);
return false;
}
}
//调用consumer.accept(item)消费元素
public final void onNext(T item) {
try {
consumer.accept(item);
} catch (Throwable ex) {
subscription.cancel();
status.completeExceptionally(ex);
}
}
接着来看往BufferedSubscription维护的数组Object[]中放入元素的submit(T item)方法:
public int submit(T item) {
return doOffer(item, Long.MAX_VALUE, null);
}
private int doOffer(T item, long nanos,
BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
if (item == null) throw new NullPointerException();
int lag = 0;
boolean complete, unowned;
synchronized (this) {
Thread t = Thread.currentThread(), o;
BufferedSubscription<T> b = clients;
if ((unowned = ((o = owner) != t)) && o != null)
owner = null; // disable bias
if (b == null)
complete = closed;
else {
complete = false;
boolean cleanMe = false;
BufferedSubscription<T> retries = null, rtail = null, next;
do {
next = b.next;
//
int stat = b.offer(item, unowned);
if (stat == 0) { // saturated; add to retry list
b.nextRetry = null; // avoid garbage on exceptions
if (rtail == null)
retries = b;
else
rtail.nextRetry = b;
rtail = b;
}
else if (stat < 0) // closed
cleanMe = true; // remove later
else if (stat > lag)
lag = stat;
} while ((b = next) != null);
//重试
if (retries != null || cleanMe)
lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
}
}
if (complete)
throw new IllegalStateException("Closed");
else
return lag;
}
final int offer(T item, boolean unowned) {
Object[] a;
int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
int t = tail, i = t & (cap - 1), n = t + 1 - head;
if (cap > 0) {
boolean added;
if (n >= cap && cap < maxCapacity) // resize
//将item放入Object[] a
added = growAndOffer(item, a, t);
else if (n >= cap || unowned) // need volatile CAS
added = QA.compareAndSet(a, i, null, item);
else { // can use release mode
QA.setRelease(a, i, item);
added = true;
}
if (added) {
tail = t + 1;
stat = n;
}
}
return startOnOffer(stat);
}
总结:JDK9提供的响应式api SubmissionPublisher内部维护了ConsumerSubscriber、BufferedSubscription。通过submit方法产生元素并缓存在每个订阅Subscriber独立的BufferedSubscription中的缓冲buffer。通过consume(Consumer consumer)方法,将consumer包装成内部的ConsumerSubscriber,并调用subscribe(),通过创建独立的BufferedSubscription将Publisher、Subscriber建立关系消费元素。如何实现的呢?BufferedSubscription通过调用onSubscribe()将消费元素的动作(从BufferedSubscription中的缓冲buffer中获取元素消费)包装成SubmissionPublisher内部的ConsumerTask对象,并交给ForkJoinPool线程池(也可以自指定)执行。