如何编写优雅的异步代码 — CompletableFuture

8,824 阅读9分钟

前言

在我们的意识里,同步执行的程序都比较符合人们的思维方式,而异步的东西通常都不好处理。在异步计算的情况下,以回调表示的动作往往会分散在代码中,也可能相互嵌套在内部,如果需要处理其中一个步骤中可能发生的错误时,情况变得更加糟糕。Java 8 引入了很多的新特性,其中就包含了 CompletableFuture 类的引入,这让我们编写清晰可读的异步代码变得更加容易,该类功能非常强大,包含了超过 50 多个方法。。。

什么是 CompletableFuture

CompletableFuture 类的设计灵感来自于 Google GuavaListenableFuture 类,它实现了 FutureCompletionStage 接口并且新增了许多方法,它支持 lambda,通过回调利用非阻塞方法,提升了异步编程模型。它允许我们通过在与主应用程序线程不同的线程上(也就是异步)运行任务,并向主线程通知任务的进度、完成或失败,来编写非阻塞代码。

为什么要引入 CompletableFuture

Java 的 1.5 版本引入了 Future,你可以把它简单的理解为运算结果的占位符,它提供了两个方法来获取运算结果。

  • get():调用该方法线程将会无限期等待运算结果。
  • get(long timeout, TimeUnit unit):调用该方法线程将仅在指定时间 timeout 内等待结果,如果等待超时就会抛出 TimeoutException 异常。

Future 可以使用 RunnableCallable 实例来完成提交的任务,通过其源码可以看出,它存在如下几个问题:

  • 阻塞 调用 get() 方法会一直阻塞,直到等待直到计算完成,它没有提供任何方法可以在完成时通知,同时也不具有附加回调函数的功能。
  • 链式调用和结果聚合处理 在很多时候我们想链接多个 Future 来完成耗时较长的计算,此时需要合并结果并将结果发送到另一个任务中,该接口很难完成这种处理。
  • 异常处理 Future 没有提供任何异常处理的方式。

以上这些问题在 CompletableFuture 中都已经解决了,接下来让我们看看如何去使用 CompletableFuture

如何创建 CompletableFuture

最简单的创建方式就是调用 CompletableFuture.completedFuture(U value) 方法来获取一个已经完成的 CompletableFuture 对象。

@Test
public void testSimpleCompletableFuture() {
    CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello mghio");
    assertTrue(completableFuture.isDone());
    try {
        assertEquals("Hello mghio", completableFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

需要注意的是当我们对不完整的 CompleteableFuture调用 get 方法的话,会由于 Future 未完成,因此 get 调用将永远阻塞,此时可以使用 CompletableFuture.complete 方法手动完成 Future

任务异步处理

当我们想让程序在后台异步执行任务而不关心任务的处理结果时,可以使用 runAsync 方法,该方法接收一个 Runnable 类型的参数返回 CompletableFuture<Void>

@Test
public void testCompletableFutureRunAsync() {
    AtomicInteger variable = new AtomicInteger(0);
    CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable));
    runAsync.join();
    assertEquals(100, variable.get());
}

public void process(AtomicInteger variable) {
    System.out.println(Thread.currentThread() + " Process...");
    variable.set(100);
}

如果我们想让任务在后台异步执行而且需要获取任务的处理结果时,可以使用 supplyAsync 方法,该方法接收一个 Supplier<T> 类型的参数返回一个 CompletableFuture<T>

@Test
public void testCompletableFutureSupplyAsync() {
    CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process);
    try {
        assertEquals("Hello mghio", supplyAsync.get()); // Blocking
    } catch (ExecutionException | InterruptedException e) {
        e.printStackTrace();
    }
}

public String process() {
    return "Hello mghio";
} 

看到这里你可能会有个问题,上面执行 runAsyncsupplyAsync 任务的线程是从哪里来的、谁创建的呢?实际上它和 Java 8 中的 parallelStream 类似, CompletableFuture 也是从全局 ForkJoinPool.commonPool() 获得的线程中执行这些任务的。同时,上面的两个方法也提供了自定义线程池去执行任务,其实你如果去了解过 CompletableFuture 的源码的话,你会发现其 API 中的所有方法都有个重载的版本,有或没有自定义 Executor 执行器。

@Test
public void testCompletableFutureSupplyAsyncWithExecutor() {
    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
    CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process, newFixedThreadPool);
    try {
        assertEquals("Hello mghio", supplyAsync.get()); // Blocking
    } catch (ExecutionException | InterruptedException e) {
        e.printStackTrace();
    }
}

public String process() {
    return "Hello mghio";
}

链式调用和结果聚合处理

我们知道 CompletableFutureget() 方法会一直阻塞直到获取到结果,CompletableFuture 提供了 thenApplythenAcceptthenRun 等方法来避免这种情况,而且我们还可以添加任务完成后的回调通知。这几个方法的使用场景如下:

  • thenApply 当我们如果要在从 Future 接收值后任务之前运行自定义的业务代码,然后要为此任务返回一些值时,则可以使用该方法
  • thenAccept 如果我们希望在从 Future 接收到一些值后执行任务之前运行自定义的业务代码而不关心返回结果值时,则可以使用该方法
  • thenRun 如果我们想在Future完成后运行自定义的业务代码,并且不想为此返回任何值时,则可以使用该方法
@Test
public void testCompletableFutureThenApply() {
    Integer notificationId = CompletableFuture.supplyAsync(this::thenApplyProcess)
        .thenApply(this::thenApplyNotify) // Non Blocking
        .join();
    assertEquals(new Integer(1), notificationId);
}

@Test
public void testCompletableFutureThenAccept() {
    CompletableFuture.supplyAsync(this::processVariable)
        .thenAccept(this::thenAcceptNotify) // Non Blocking
        .join();
    assertEquals(100, variable.get());
}

@Test
public void testCompletableFutureThenRun() {
    CompletableFuture.supplyAsync(this::processVariable)
        .thenRun(this::thenRunNotify)
        .join();
    assertEquals(100, variable.get());
}

private String processVariable() {
    variable.set(100);
    return "success";
}

private void thenRunNotify() {
    System.out.println("thenRun completed notify ....");
}

private Integer thenApplyNotify(Integer integer) {
    return integer;
}

private void thenAcceptNotify(String s) {
    System.out.println(
    String.format("Thread %s completed notify ....", Thread.currentThread().getName()));
}

public Integer thenApplyProcess() {
    return 1;
}

如果有大量的异步计算,那么我们可以继续将值从一个回调传递到另一个回调中去,也就是使用链式调用方式,使用方式很简单。

@Test
public void testCompletableFutureThenApplyAccept() {
    CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .thenAccept((i) -> notifyByEmail()).join();
}

private void notifyByEmail() {
    // business code
    System.out.println("send notify by email ...");
}

private Double notifyBalance(Double d) {
    // business code
    System.out.println(String.format("your balance is $%s", d));
    return 9527D;
}

private Double calculateBalance(Object o) {
    // business code
    return 9527D;
}

private Double findAccountNumber() {
    // business code
    return 9527D;
}

比较细心的朋友可能注意到在所有前面的几个方法示例中,所有方法都是在同一线程上执行的。如果我们希望这些任务在单独的线程上运行时,那么我们可以使用这些方法对应的异步版本。

@Test
public void testCompletableFutureApplyAsync() {
    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
    ScheduledExecutorService newSingleThreadScheduledExecutor = Executors
        .newSingleThreadScheduledExecutor();
    CompletableFuture<Double> completableFuture =
        CompletableFuture
            .supplyAsync(this::findAccountNumber,
                newFixedThreadPool) // 从线程池 newFixedThreadPool 获取线程执行任务
            .thenApplyAsync(this::calculateBalance,
                newSingleThreadScheduledExecutor)
            .thenApplyAsync(this::notifyBalance);
    Double balance = completableFuture.join();
    assertEquals(9527D, balance);
}

执行结果处理

thenCompose 方法适合有依赖性的任务处理,比如一个计算账户余额的业务:首先我们要先找到帐号,然后为该帐户计算余额,然后计算完成后再发送通知。所有这些任务都是依赖前一个任务的返回 CompletableFuture 结果,此时我们需要使用 thenCompose 方法,其实有点类似于 Java 8 流的 flatMap 操作。

@Test
public void testCompletableFutureThenCompose() {
    Double balance = this.doFindAccountNumber()
        .thenCompose(this::doCalculateBalance)
        .thenCompose(this::doSendNotifyBalance).join();
    assertEquals(9527D, balance);
}

private CompletableFuture<Double> doSendNotifyBalance(Double aDouble) {
    sleepSeconds(2);
    // business code
    System.out.println(String.format("%s doSendNotifyBalance ....", Thread.currentThread().getName()));
    return CompletableFuture.completedFuture(9527D);
}

private CompletableFuture<Double> doCalculateBalance(Double d) {
    sleepSeconds(2);
    // business code
    System.out.println(String.format("%s doCalculateBalance ....", Thread.currentThread().getName()));
    return CompletableFuture.completedFuture(9527D);
}

private CompletableFuture<Double> doFindAccountNumber() {
    sleepSeconds(2);
    // business code
    System.out.println(String.format("%s doFindAccountNumber ....", Thread.currentThread().getName()));
    return CompletableFuture.completedFuture(9527D);
}

private void sleepSeconds(int timeout) {
    try {
        TimeUnit.SECONDS.sleep(timeout);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

thenCombine 方法主要是用于合并多个独立任务的处理结果。假设我们需要查找一个人的姓名和住址,则可以使用不同的任务来分别获取,然后要获得这个人的完整信息(姓名 + 住址),则需要合并这两种方法的结果,那么我们可以使用 thenCombine 方法。

@Test
public void testCompletableFutureThenCombine() {
    CompletableFuture<String> thenCombine = this.findName().thenCombine(this.findAddress(), (name, address) -> name + address);
    String personInfo = thenCombine.join();
    assertEquals("mghio Shanghai, China", personInfo);
}

private CompletableFuture<String> findAddress() {
    return CompletableFuture.supplyAsync(() -> {
        sleepSeconds(2);
        // business code
        return "Shanghai, China";
    });
}

private CompletableFuture<String> findName() {
    return CompletableFuture.supplyAsync(() -> {
        sleepSeconds(2);
        // business code
        return "mghio ";
    });
}

等待多个任务执行完成

在许多情况下,我们希望并行运行多个任务,并在所有任务完成后再进行一些处理。假设我们要查找 3 个不同用户的姓名并将结果合并。此时就可以使用 CompletableFuture 的静态方法 allOf,该方法会等待所有任务完成,需要注意的是该方法它不会返回所有任务的合并结果,因此我们必须手动组合任务的执行结果。

@Test
public void testCompletableFutureAllof() {
    List<CompletableFuture<String>> list = Lists.newArrayListWithCapacity(4);
    IntStream.range(0, 3).forEach(num -> list.add(findName(num)));

    CompletableFuture<Void> allFuture = CompletableFuture
        .allOf(list.toArray(new CompletableFuture[0]));

    CompletableFuture<List<String>> allFutureList = allFuture
        .thenApply(val -> list.stream().map(CompletableFuture::join).collect(Collectors.toList()));

    CompletableFuture<String> futureHavingAllValues = allFutureList
        .thenApply(fn -> String.join("", fn));

    String result = futureHavingAllValues.join();
    assertEquals("mghio0mghio1mghio2", result);
}

private CompletableFuture<String> findName(int num) {
    return CompletableFuture.supplyAsync(() -> {
        sleepSeconds(2);
        // business code
        return "mghio" + num;
    });
} 

异常处理

在多线程中程序异常其实不太好处理,但是幸运的是在 CompletableFuture 中给我们提供了很方便的异常处理方式,在我们上面的例子代码中:

@Test
public void testCompletableFutureThenCompose() {
    Double balance = this.doFindAccountNumber()
        .thenCompose(this::doCalculateBalance)
        .thenCompose(this::doSendNotifyBalance).join();
}

在上面的代码中,三个方法 doFindAccountNumberdoCalculateBalancedoSendNotifyBalance 只要任意一个发生异常了,则之后调用的方法将不会运行。 CompletableFuture 提供了三种处理异常的方式,分别是 exceptionallyhandlewhenComplete 方法。第一种方式是使用 exceptionally 方法处理异常,如果前面的方法失败并发生异常,则会调用异常回调。

@Test
public void testCompletableFutureExceptionally() {
    CompletableFuture<Double> thenApply = CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .exceptionally(ex -> {
            System.out.println("Exception " + ex.getMessage());
            return 0D;
        });
    Double join = thenApply.join();
    assertEquals(9527D, join);
}

第二种方式是使用 handle 方法处理异常,使用该方式处理异常比上面的 exceptionally 方式更为灵活,我们可以同时获取到异常对象和当前的处理结果。

@Test
public void testCompletableFutureHandle() {
    CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .handle((ok, ex) -> {
            System.out.println("最终要运行的代码...");
            if (ok != null) {
            System.out.println("No Exception !!");
            } else {
            System.out.println("Exception " + ex.getMessage());
            return -1D;
            }
            return ok;
        });
}

第三种是使用 whenComplete 方法处理异常。

@Test
public void testCompletableFutureWhenComplete() {
    CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .whenComplete((result, ex) -> {
            System.out.println("result = " + result + ", ex = " + ex);
            System.out.println("最终要运行的代码...");
        });
}

总结

在本文中,介绍了 CompletableFuture 类的部分方法和使用方式,这个类的方法很多同时提供的功能也非常强大,在异步编程中使用的比较多,熟悉了基本的使用方法之后要深入了解还是要深入源码分析其实现原理。