阅读 264

CompletableFuture源码分析

概述

CompletableFuture是1.8加入的异步处理的Future,我们知道之前有个Future接口,但调用future.get方法时,主线程会阻塞,有时还是影响性能,能不能给你个回调函数处理完后接着处理就好。CompletableFuture就这样诞生了。
类结构

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

    volatile Object result;       // 异步处理的结果或者异常数据
    volatile Completion stack;    // Top of Treiber stack of dependent actions
    。。。
}
复制代码

上面简单看了CompletableFuture的类结构,可以看到CompletableFuture实现了Future、CompletionStage接口,我们着重看下CompletionStage中的方法,提供的异步处理能力。
例子

private static void testUseAsyncMethod() throws InterruptedException, ExecutionException {
        CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
            sleepOneSecond();
            return "CC";
        }).thenApply(name -> {
            return "Hello " + name;
        }).thenApply(greeting -> {
            return greeting + ", Welcome to the World";
        });
        System.out.println(result.get());
    }
复制代码

例子仍然能够在github中下载,这里贴出来一部分方便我们串讲源码。
我们先看第一个使用的supplyAsync方法。

//静态方法,如果没有传入线程池,使用ForkJoinPool的common线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}

static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                 Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    //新建CompletableFuture对象
    CompletableFuture<U> d = new CompletableFuture<U>();
    //构造AsyncSupply对象,线程池提交AsyncSupply任务
    e.execute(new AsyncSupply<U>(d, f));
    //将CompletableFuture对象返回
    return d;
}
复制代码
//可以看到AsyncSupply是一个Runnable对象
static final class AsyncSupply<T> extends ForkJoinTask<Void>
        implements Runnable, AsynchronousCompletionTask {
    CompletableFuture<T> dep; Supplier<T> fn;
    AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
        this.dep = dep; this.fn = fn;
    }

    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) {}
    public final boolean exec() { run(); return true; }

    //重写exec方法、 run方法
    public void run() {
        CompletableFuture<T> d; Supplier<T> f;
        if ((d = dep) != null && (f = fn) != null) {
            dep = null; fn = null;
            //CompletableFuture对象的result为空时
            if (d.result == null) {
                try {
                    //调用传入的supplier的get方法,并将结果放入result字段
                    //注意:这是在线程池中提交的,所以是异步处理的
                    d.completeValue(f.get());
                } catch (Throwable ex) {
                    d.completeThrowable(ex);
                }
            }
            //处理完依赖方法后,处理栈顶方法,会和后面的回调方法入栈呼应
            d.postComplete();
        }
    }
}
复制代码
final void postComplete() {
    //弹出依赖的栈顶数据进行处理
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}
复制代码

再看下例子中thenApply方法,异步处理完supplyAsync后的回调方法。

public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
    Executor e, Function<? super T,? extends V> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<V> d =  new CompletableFuture<V>();
    //线程池为空,则尝试看依赖的方法有没有处理完成,或发生异常
    //线程池不为空或者依赖的方法还没有处理完成,则将回调方法构造成UniApply,push到等待栈里面
    if (e != null || !d.uniApply(this, f, null)) {
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
        push(c);
        //尝试处理目标回调方法
        c.tryFire(SYNC);
    }
    return d;
}
复制代码

一起看看uniApply方法依赖的方法处理完成则尝试处理回调方法

final <S> boolean uniApply(CompletableFuture<S> a,
                           Function<? super S,? extends T> f,
                           UniApply<S,T> c) {
    Object r; Throwable x;
    //a是依赖方法,result为空,说明还没有处理完成返回false
    if (a == null || (r = a.result) == null || f == null)
        return false;
    //如果依赖的方法发生了异常,处理异常并返回    
    tryComplete: if (result == null) {
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        try {
            //判断是否可以执行,不可以执行则直接返回false
            if (c != null && !c.claim())
                return false;
            @SuppressWarnings("unchecked") S s = (S) r;
            //到了这里说明依赖的方法处理完成了,调用回调方法调用用依赖方法的结果result做参数
            //注意这里都是线程池中的线程在执行,所以是异步执行
            completeValue(f.apply(s));
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}
复制代码

push 方法

final void push(UniCompletion<?,?> c) {
    if (c != null) {
        //将回调方法C push到栈顶
        while (result == null && !tryPushStack(c))
            lazySetNext(c, null); // clear on failure
    }
}

final boolean tryPushStack(Completion c) {
    Completion h = stack;
    lazySetNext(c, h);
    return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
复制代码

tryFire方法

//再次尝试判断依赖方法是否处理完成,处理完成则调用目标回调方法
final CompletableFuture<V> tryFire(int mode) {
    CompletableFuture<V> d; CompletableFuture<T> a;
    if ((d = dep) == null ||
        !d.uniApply(a = src, fn, mode > 0 ? null : this))
        return null;
    dep = null; src = null; fn = null;
    //成功处理完依赖方法和回调方法后,进行处理,可能唤醒其他的回调方法或者清理栈
    return d.postFire(a, mode);
}
复制代码

CompletableFuture其他还有很多方法,但都类似,主要都用了上面的方法。