任务编排:CompletableFuture从入门到精通

19,077 阅读16分钟

前言

最近遇到了一个业务场景,涉及到多数据源之间的请求的流程编排,正好看到了一篇某团介绍CompletableFuture原理和使用的技术文章,主要还是涉及使用层面。网上很多文章涉及原理的部分讲的不是特别详细且比较抽象。因为涉及到多线程的工具必须要理解原理,不然一旦遇到问题排查起来就只能凭玄学,正好借此梳理一下CompletableFuture的工作原理

背景

我们把Runnable理解为最基本的线程任务,只具备在线程下执行一段逻辑的能力。为了获取执行的返回值,创造了Callable和与其配合使用的Future。为了将任务之间进行逻辑编排,就诞生了CompletableFuture。关于如何理解任务的逻辑编排,举一个简单的例子:

打开电脑-更新系统这两个操作是有先后顺序的,但是泡茶和这两个操作没有先后顺序,是可以并行的,而开始办公必须要等待其他操作结束之后才能进行,这就形成了任务编排的执行链。

在IO密集型系统中,类似的场景有很多。因为不同数据集的查询依赖主键不同,A数据集的查询主键是B数据集的一个字段这种情况很常见,通常还需要并发查询多个数据集的数据,所以对于多线程的执行编排是有需求的。

一种解决办法是CountDownLatch,让线程执行到某个地方后进行等待,直到依赖的任务执行结束。对于一些简单的执行链是可以满足的,但是当编排逻辑复杂起来,CountDownLatch会导致代码难以维护和调试。所以诞生了CompletableFuture用来描述和维护任务之间的依赖关系以进行任务编排。在实际应用中,有以下两类场景是适合使用任务编排的:

  • 多数据源请求的流程编排

  • 非阻塞化网关等NIO场景

使用方式

创建与执行

同步方法

和FutureTask类似,CompletableFuture也通过get()方法获取执行结果。但是不同的是,CompletableFuture本身可以不承载可执行的任务(相比FutureTask则必须承载一个可执行的任务Callable),通过一个用于标记执行成功并设置返回值的函数,在使用上也更为灵活,如下:

	CompletableFuture<String> demo = new CompletableFuture<>();
	demo.complete("success");
	System.out.println(demo.get());

执行结果:success

和Future类似,get()函数也是同步阻塞的,调用get函数后线程会阻塞直到调用complete方法标记任务已经执行成功。

除了手动触发任务的完成,也可以让创建对象的同时就标记任务完成:

	CompletableFuture<String> demo = CompletableFuture.completedFuture("success");
	System.out.println(demo.get());

执行结果:success

异步方法

相比于同步方法,异步执行更为常见。比如下面这个例子:

        CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> {
            System.out.println("do something by thread" + Thread.currentThread().getName());
            return "success";
        });
        System.out.println(demo.get());

执行结果:
do something by threadForkJoinPool.commonPool-worker-9
success

supplyAsync方法接收一个Supplier对象,逻辑函数交给线程池中的线程异步执行

默认会使用ForkJoinPool的公共线程池来执行代码(不推荐),当然也可以指定线程池,如下:

	ExecutorService executor = Executors.newFixedThreadPool(4);
    CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> {
        System.out.println("do something by thread" + Thread.currentThread().getName());
        return "success";
    }, executor);
    System.out.println(demo.get());

执行结果:
do something by threadpool-1-thread-1
success

如果不需要执行结果,也可以用runAsync方法:

	CompletableFuture.runAsync(() -> {
        System.out.println("do something by thread" + Thread.currentThread().getName());
    });

执行结果:
do something by threadForkJoinPool.commonPool-worker-9

多任务编排

多任务编排是CompletableFuture的核心,这里列举不同的场景来进行说明

一元依赖

步骤2需要依赖步骤1执行完毕才能执行,类似主线程的顺序执行,可以通过以下方式实现:

      ExecutorService executor = Executors.newFixedThreadPool(4);
      CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("执行【步骤1】");
        return "【步骤1的执行结果】";
      }, executor);

      CompletableFuture<String> step2 = step1.thenApply(result -> {
        System.out.println("上一步操作结果为:" + result);
        return "【步骤2的执行结果】";
      });
      System.out.println("步骤2的执行结果:" + step2.get());

执行结果:
执行【步骤1】
上一步操作结果为:【步骤1的执行结果】
步骤2的执行结果:【步骤2的执行结果】

通过thenApply方法,接收上一个CompletableFuture对象的返回值,其中隐含的逻辑是,该处逻辑只有等上一个CompletableFuture对象执行完后才会执行

二元依赖

相比于一元依赖的顺序执行链,二元依赖更为常见,比如下面这个场景:

步骤1和2是并行的,而步骤3需要等步骤1和2执行完之后才能执行,通过CompletableFuture是这么实现的:

        ExecutorService executor = Executors.newFixedThreadPool(4);
        CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行【步骤1】");
            return "【步骤1的执行结果】";
        }, executor);

        CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行【步骤2】");
            return "【步骤2的执行结果】";
        }, executor);

        CompletableFuture<String> step3 = step1.thenCombine(step2, (result1, result2) -> {
            System.out.println("前两步操作结果分别为:" + result1 + result2);
            return "【步骤3的执行结果】";
        });
        
        System.out.println("步骤3的执行结果:" + step3.get());

执行结果:
执行【步骤1】
执行【步骤2】
前两步操作结果分别为:【步骤1的执行结果】【步骤2的执行结果】
步骤3的执行结果:【步骤3的执行结果】

通过thenCombine方法,等待step1和step2都执行完毕后,获取其返回结果并执行一段新的逻辑

多元依赖

当然还可能有下面这种场景,步骤M需要依赖1-N的N个前置节点:

这种情况会更为复杂,实现方式如下:

    	ExecutorService executor = Executors.newFixedThreadPool(4);
        CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行【步骤1】");
            return "【步骤1的执行结果】";
        }, executor);
        CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行【步骤2】");
            return "【步骤2的执行结果】";
        }, executor);
        CompletableFuture<String> step3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行【步骤3】");
            return "【步骤3的执行结果】";
        }, executor);

        CompletableFuture<Void> stepM = CompletableFuture.allOf(step1, step2, step3);
        CompletableFuture<String> stepMResult = stepM.thenApply(res -> {
           // 通过join函数获取返回值
           String result1 = step1.join();
           String result2 = step2.join();
           String result3 = step3.join();
        
           return result1 + result2 + result3;
        });
        System.out.println("步骤M的结果:" + stepMResult.get());

执行结果:
执行【步骤1】
执行【步骤2】
执行【步骤3】
步骤M的结果:【步骤1的执行结果】【步骤2的执行结果】【步骤3的执行结果】

通过allOf函数声明当参数中的所有任务执行完毕后,才会执行下一步操作,但是要注意,allOf本身只是定义节点,真正阻塞的位置是thenApply函数。

和之前的方式不同,由于采用了不定变量,所以要通过CompletableFuture#join来获取每个任务的返回值。

除了allOf之外,如果我们需要任意一个任务完成后就执行下一步操作,可以使用anyOf方法,如下:

    // step1/2/3的定义相同
    // ...
    CompletableFuture<Object> stepM = CompletableFuture.anyOf(step1, step2, step3);
    System.out.println("步骤M的结果:" + stepM.get());

执行结果:
步骤M的结果:【步骤1的执行结果】

与allOf不同,anyOf的返回值即为第一个执行完毕的任务

工作原理

概念

在讲原理之前,先来了解一下CompletableFuture的定义。在实现上,CompletableFuture继承了Future和CompletionStage

Future毋庸置疑,CompletableFuture最基本的能力就是获取异步计算的结果。CompletionStage则是声明了编排节点的能力,每一个CompletionStage都声明了流程树上的一个节点(见下图)

CompletionStage声明的接口thenXXX,包括thenApply、thenCompose等,定义了节点之间的连接方式(实际情况更为复杂,具体原理参考下节函数分析),通过这种方式,最终定义出一颗流程树,进而实现了多线程的任务编排。CompletionStage的方法返回值通常是另一个CompletionStage,进而构成了链式调用。

结构分析

CompletableFuture里包含两个变量,result和stack

result很好理解,就是当前节点的执行结果。stack就比较复杂,是一个无锁并发栈,声明了当前节点执行完毕后要触发的节点列表,接下来我们详细讲一下

CompletableFuture中的栈设计

Completion是一个无锁并发栈,声明了当前节点执行完毕后要触发的节点列表。在结构上是一个链式节点,其中只包含了一个指向下一个节点的next对象

我们可以看到Completion有繁多的实现类,表示不同的依赖方式。

我们知道,在CompletableFuture中的流程编排是通过thenApply、thenAccept、thenCombine等方式来实现的,

  • thenApply:接收上一步的处理结果,进行下一步消费,并返回结果

  • thenAccept:和thenApply类似,不过无结果返回

  • thenCombine:同时接收两个流程节点,等其都执行完毕后一起处理结果

每个函数实际分别对应了一种Completion实现类,以刚才的三种函数为例,分别对应了UniApply、UniAccept、UniCombine三个对象。所以Completion可以认为是流程编排逻辑的抽象对象,可以理解为流程节点,或者任务节点。

以UniCompletion为例,结构如下:

abstract static class UniCompletion<T,V> extends Completion {
    Executor executor;                 // 线程池
    CompletableFuture<V> dep;          // 完成任务依赖的cf
    CompletableFuture<T> src;          // 完成任务所需资源所在的cf

    /**
     * 如果任务可以被执行则返回true,通过FJ标记位保证只有一个线程判断成功。
     * 如果任务是异步的,则在任务启动后通过tryFire来执行任务
     */
    final boolean claim() {
        Executor e = executor;
        if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
            if (e == null)
                return true;
            executor = null; // disable
            e.execute(this);
        }
        return false;
    }

    /**
     * 如果dep不为空返回true,用以判断当前任务节点是否已被激活
     */
    final boolean isLive() {
        return dep != null;
    }
}

先来看claim函数,这个比较容易解释,该函数用于判断任务是否可被执行。通过compareAndSetForkJoinTaskTag函数的CAS操作保证只有一个线程执行成功,主要作用就是在多线程情况下确保任务的正确执行。

接下来就是重头戏,源任务与依赖任务,这两个概念是CompletableFuture的核心,贯穿了所有逻辑的执行,只有理解了这两个概念,才能对执行原理有比较透彻的理解

源任务与依赖任务

源任务和依赖任务在UniCompletion中分别为src和dep属性,举个具体的例子,比如下面这段代码:

CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
    return "A";
});

CompletableFuture<String> b = a.thenApply(res -> {
    return "B " + res;
});

调用a.thenApply(Function fn)时,可以认为是生成了一个UniApply的流程节点(具体怎么生成的下文会提到),其中源任务就是a,而依赖任务则是thenApply的返回值。

换个简单的说法,在上面的代码中,我们有a、b两个任务,b任务的完成需要依赖于a任务的完成,所以a会生成一个流程节点(UniApply对象),其中包含了b想要执行完成的全部资源(a的执行结果等),这时a任务就叫做源任务(因为a任务中有任务资源)。而b任务需要依赖a任务来完成,所以b任务叫做依赖任务。

源任务的完成会触发依赖任务的执行,这个就是任务编排的基本原理

函数分析

在本节中,CompletableFuture由于名字太长,会以cf来代指

由于thenAccept、thenCombine函数等逻辑比较类似,我们以最基础的thenApply函数为例进行分析

核心函数

我们先不要直接从thenApply、complete等函数入手,我们先来看这几个核心函数,不明白做什么的不要紧,先理解这几个函数的原理就好

uniApply

CompletableFuture的逻辑在于“只有当X条件满足时,再执行Y逻辑”,uniApply函数就是负责这样的逻辑判断,首先看源码:

final <S> boolean uniApply(CompletableFuture<S> a,
                           Function<? super S,? extends T> f, UniApply<S> c) {    
    Object r; Throwable x;
    // 1
    if (a == null || (r = a.result) == null || f == null)
        return false;

    tryComplete: if (result == null) {
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }

        try {
            // 2
            if (c != null && !c.claim())
                return false;

            // 3
            S s = (S) r;
            completeValue(f.apply(s));
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

整个方法可以分为三段(已在代码中标出),我们分开来说。

第一段,判断所给的任务节点是否已经执行完毕,如果已经执行完毕则进入下一步

第二段,如果有关联的流程节点,则通过claim函数判断当前任务是否可被执行,如果可执行则进入下一步(确保多线程情况下任务的正确执行)

第三段,执行传入的函数并把值设置到当前对象中。

整个逻辑是这样的,首先我们传入了一个cf对象、一个函数,和一个流程节点。只有当传入的cf对象执行完成(result不为空),再执行给定的函数,并把执行结果设置到当前对象中。如果不考虑特殊情况,uniApply方法用一句话解释就是:如果给定的任务已经执行完毕,就执行传入的函数并把执行结果设置到当前对象中

tryFire

uniApply函数仅仅是一个有条件的函数执行器,真正想要达到任务编排还需要其他函数的参与,我们先来看tryFire方法:

final CompletableFuture<V> tryFire(int mode) {
            CompletableFuture<V> d; CompletableFuture<T> a;
    if ((d = dep) == null ||
        !d.uniApply(a = src, fn, mode > 0 ? null : this))
        return null;
    dep = null; src = null; fn = null;
    return d.postFire(a, mode);
}

tryFire根据关联关系的不同有多种实现,实际执行流程相差不大,这里以常用的UniApply的实现来举例。

首先这个方法接收了一个mode参数,有以下几种取值:

  • -1:传播模式,或者叫嵌套模式。表示任务实际已经执行完毕,只是在传递状态

  • 0:同步模式。任务由当前线程调用处理

  • 1:异步模式。任务需要提交到指定线程池处理

根据mode的不同,实际tryFire执行的流程也会发生很大区别。不过归根到底,tryFire方法的本质是调用了uniApply执行一次任务,如果执行成功,则会清空dep、src等自身属性(清空之后isLive方法会返回false,表示任务已经执行完毕),同时通过postFire方法执行该任务下的其他依赖任务,实现任务的传播执行。

postFire方法由于和tryFire方法关联比较密切,这里放在一起说明:

final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
        if (a != null && a.stack != null) {
            if (mode < 0 || a.result == null)
                a.cleanStack();
            else
                a.postComplete();
        }
        if (result != null && stack != null) {
            if (mode < 0)
                return this;
            else
                postComplete();
        }
        return null;
}

这里简单概括一下执行原理,如果是嵌套模式,则清理栈内无效任务,并返回对象本身(可以认为什么都没做);否则通过postComplete方法执行栈内依赖此任务的其他任务项

postComplete

当一个CompletionStage执行完成之后,会触发依赖它的其他CompletionStage的执行,这些Stage的执行又会触发新一批的Stage执行,这就是任务的顺序编排

如果说uniApply是基础功能,是负责线程安全且遵守依赖顺序地执行一个函数,那么postComplete就是核心逻辑,负责当一个任务执行完毕后触发依赖该任务的其他任务项,先来看源码:

final void postComplete() {
    CompletableFuture<?> f = this; Completion h;
    // 1
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;

        // 2
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            // 3
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

在源码上标记了三个位置,分别代表三层结构,首先是第一层while循环,只要当前对象栈中还有流程节点,那么就循环执行内部逻辑。

第二层,由于continue的存在,和第一层结合起来看就是一个批量压栈的操作,将所有需要触发的依赖树按顺序压入当前对象栈中。

第三层,通过tryFire按顺序触发栈中所有的依赖任务。上节我们可以看到tryFire函数内根据mode的不同会触发不同的逻辑,这里mode指定为NESTED就是为了避免循环调用postComplete

执行函数

几个核心函数介绍完了,接下来我们回到最外层,来看看任务是如何执行的,首先我们以thenApply为例分析核心执行函数

supplyAsync(实际调用为asyncSupplyStage)

该方法用于提交一个任务到线程池中执行,并将该任务打包为一个CompletableFuture节点

static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<U>();
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}

其中AsyncSupply实现了Runnable接口,所以理解为一种特殊的任务即可。这种任务在执行完成后会调用completeValue将函数执行的结果设置当前对象中。

所以整体逻辑为,首先创建一个cf对象,并立即将任务添加到线程池执行,在执行完毕后会将任务执行的结果保存到所创建的cf对象中。

complete

public boolean complete(T value) {
    boolean triggered = completeValue(value);
	postComplete();
	return triggered;
}

该方法直接调用completeValue方法设置值,设置完值之后调用postComplete方法来依次执行后续任务。当调用该方法时,会实现任务的依赖扩散执行

thenApply(实际调用为uniApplyStage)

private <V> CompletableFuture<V> uniApplyStage(
    	Executor e, Function<? super T,? extends V> f) {
    
    if (f == null) throw new NullPointerException();
    
    CompletableFuture<V> d =  new CompletableFuture<V>();
    if (e != null || !d.uniApply(this, f, null)) {
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}

结合上节分析的核心函数,我们很容易可以分析该函数的流程:执行function函数,如果条件不满足则执行失败,会生成一个流程节点并压入栈,同时再通过tryFire再尝试执行一次,如果条件依然不满足,那么只能等待所依赖的任务执行完成后通过postComplete触发执行。

get

public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}

方法核心在于waitingGet,内部使用了ForkJoinPool.managedBlock来阻塞线程直到执行完毕

流程分析

在函数分析中,我们实际已经说明了任务依赖执行的基本原理,这里为了更为详细地说明,我们以一个简单的例子来分析。

首先我们抛开一切复杂的因素,以最基本的同步串行代码来讲,我们现在有这样一个对象:

    CompletableFuture<String> A = new CompletableFuture<>();

然后我们这时候给其加上了任务编排,增加了一个thenApply依赖

    AtomicInteger seq = new AtomicInteger(0);
    Function<String, String> func = s -> s + " | " + seq.incrementAndGet();
    
    CompletableFuture<String> a = new CompletableFuture<>();
    CompletableFuture<String> b = a.thenApply(func);

于是我们就有了这样一个结构,A的stack中压入了一个Completion节点,该节点的源任务指向A本身,而依赖任务指向了B,表示B任务依赖A任务的执行。

接下来我们再加一条依赖

    AtomicInteger seq = new AtomicInteger(0);
    Function<String, String> func = s -> s + " | " + seq.incrementAndGet();
    
    CompletableFuture<String> a = new CompletableFuture<>();
    CompletableFuture<String> b = a.thenApply(func);
    CompletableFuture<String> c = a.thenApply(func);

我们会发现两个特点:

  1. 和栈的性质一样,越晚添加的编排逻辑越早被执行

  2. 基于同一个对象衍生出来的流程节点的源任务是一致的

以此类推,thenXXX的其他逻辑也是类似的原理,当a调用complete函数时(无论是同步还是异步),都会依次触发A任务的stack下挂接的其他依赖任务。而只要a没有调用complete函数,那么thenApply中挂接的依赖任务无论如何都无法执行(因为a对象的result属性为空)

注意事项

避免主任务和子任务向同一个线程池中申请线程,由于存在依赖关系,当通过join来获取子任务的值时,一旦子任务由于线程队列已满进入阻塞队列,那么将会形成死锁