Java异步编程之CompletableFuture

2,198 阅读8分钟

我正在参加「掘金·启航计划」

异步任务

Future获取异步任务结果

利用 Java 并发包提供的 Future 可以很容易获得异步任务的执行结果,无论异步任务是通过线程池 ThreadPoolExecutor 执行的,还是通过手工创建子线程来执行的。利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用 Future 来解决。

其类结构如下:

public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);
  
  boolean isCancelled();

  boolean isDone();

  V get() throws InterruptedException, ExecutionException;

  V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
 }

方法解释如下:

Future备注
cancel(boolean)尝试取消异步任务的执行。如果任务已经执行完成、已经被取消、因为某种原因不能被取消,则返回false;如果任务正在执行,并且mayInterruptIfRunning为true,那么会调用interrupt()尝试打断任务。该方法返回结果后,isDone()总会返回true
isCancelled()如果在任务完成前被取消,返回true
isDone()如果任务完成则返回true。任务完成包括正常结束、任务被取消、任务发生异常,都返回true
get()获取异步任务执行结果,如果没有返回,则阻塞等待
get(long timeout, TimeUnit unit)在给定的时间内等待获取异步任务结果,如果超时还未获取到结果,则会抛出TimeoutException

但它本身只是一个接口,还需要看Future的具体实现类:

红色框起来的就是常见的实现类,就不一一展开赘述了,这里以FutureTask的使用为例:

 //初始化一个线程池
 ExecutorService executor = = Executors.newSingleThreadExecutor();
 //new 一个Callable并传入FutureTask
 FutureTask<String> future =new FutureTask<>(new Callable<String>() {
     public String call() {
        //do something
       return result;
   }});
 executor.execute(future);
 //在异步任务执行期间可以做一些其他的事情
 displayOtherThings();
 //通过future.get()得到异步任务执行结果
 String result=future.get();

CompletableFuture异步编程

CompletableFuture的线程池之坑

CompletableFuture又是如何创建新线程的?答案是ForkJoinPool.commonPool() ,我们熟悉的老朋友又回来了,还是它。当需要新的线程时,CompletableFuture会从commonPool中获取线程,相关源码如下:

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

我们已经知道了commonPool的潜在风险,在生产环境中使用无异于给自己挖坑换句话说,当我决定使用CompletableFuture的时候,默认就是我们要创建自己的线程池。不要偷懒,更不要存在侥幸心理。

这里补充一点:CompletableFuture是否使用默认线程池的依据,和机器的CPU核心数有关。

当CPU核心数减1大于1时,才会使用默认的线程池(ForkJoinPool),否则将会为每个CompletableFuture的任务创建一个新线程去执行

即,CompletableFuture的默认线程池,只有在双核以上的机器内才会使用。在双核及以下的机器中,会为每个任务创建一个新线程,等于没有使用线程池,且有资源耗尽的风险

注意事项

  1. 如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
  2. 记得考虑异常处理。
  3. 公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核数-1(也可以通过JVM option:**-**Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)

创建 CompletableFuture 对象

CompletableFuture的核心方法总共分为四类,而这四类方法又分为两种模式:同步和异步

类型接收参数返回结果支持异步
Supply✔︎✔︎
Run✔︎
Accept✔︎✔︎
Apply✔︎✔︎✔︎

上述接种类型的方法一般都有三个变种方法:同步异步指定线程池。比如, thenApply() 的三个变种方法如下所示:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

第一个同步,第二个异步,第三个支持指定线程池

如何理解 CompletionStage 接口

任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。如下图

还要注意参数类型:

  • 参数 fn 的类型是接口 Function<T, R>,代表这个方法既能接收参数也支持返回值
  • 参数 consumer 的类型是接口 Consumer,代表这个方法虽然支持接收参数,但却不支持返回值
  • 参数 action 的类型是 Runnable,代表这个方法既不能接收参数也不支持返回值。

上述参数类型的方法一般都有三个变种方法:同步、异步和指定线程池。方法后缀带 Async 代表的是异步执行

描述串行关系

CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。

CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

描述 AND 汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口。

both:前两阶段同时执行完毕执行下一阶段

CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

描述 OR 汇聚关系

CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口。

either:前两阶段任一执行完毕执行下一阶段

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

案例:李四下班后坐公交回家,可以选择 111 路和 222 路,当其中有一辆公交到达,李四就选择坐这辆车回家,这种场景可以使用 applyToEither 方法

public class ApplyToEitherTest {
    public static void main(String[] args) {
        PrintTool.printTimeAndThread("李四下班准备回家。。。");
        PrintTool.printTimeAndThread("李四等待111,222公交。。。");
        CompletableFuture<String> busCF = CompletableFuture.supplyAsync(() -> {
            PrintTool.printTimeAndThread("111 路公交在路上。。。");
            PrintTool.sleep(5000);
            return "906";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            PrintTool.printTimeAndThread("222 路公交在路上。。。");
            PrintTool.sleep(4000);
            return "539";
        }), first -> first + "路公交");
        PrintTool.printTimeAndThread("李四坐上" + busCF.join());
    }
}

allOf与anyOf

allOf()anyOf() 也是一对孪生兄弟,当我们需要对多个Future的运行进行组织时,就可以考虑使用它们:

  • allOf() :给定一组任务,等待所有任务执行结束;
  • anyOf() :给定一组任务,等待其中任一任务执行结束。

异常处理

虽然fn、consumer、action它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常。

CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

使用exceptionally()回调处理异常

下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。

CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(() -> 7 / 0).thenApply(r -> r * 10)
        .exceptionally(e -> 0);

使用handle()处理异常

既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(() -> 7 / 0).thenApply(r -> r * 10)
        .handle((res, ex) -> {
            if (ex != null) {
                System.out.println("出错:" + ex.getMessage());
                return 1;
            }
            return 0;
        });

CompletionService批量执行异步任务

当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。

利用 CompletionService 实现 Forking Cluster

Dubbo 中有一种叫做Forking 的集群模式,这种集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了

利用 CompletionService 可以快速实现 Forking 这种集群模式,比如下面的示例代码就展示了具体是如何实现的。首先我们创建了一个线程池 executor 、一个 CompletionService 对象 cs 和一个Future<Integer>类型的列表 futures,每次通过调用 CompletionService 的 submit() 方法提交一个异步任务,会返回一个 Future 对象,我们把这些 Future 对象保存在列表 futures 中。

通过调用 cs.take().get(),我们能够拿到最快返回的任务执行结果,只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了。

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 用于保存 Future 对象
List<Future<Integer>> futures = new ArrayList<>(3);
// 提交异步任务,并保存 future 到 futures 
futures.add(cs.submit(() -> geocoderByS1()));
futures.add(cs.submit(() -> geocoderByS2()));
futures.add(cs.submit(() -> geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
    // 只要有一个成功返回,则 break
    for (int i = 0; i < 3; ++i) {
        r = cs.take().get();
        // 简单地通过判空来检查是否成功返回
        if (r != null) {
            break;
        }
    }
} finally {
    // 取消所有任务
    for (Future<Integer> f : futures)
        f.cancel(true);
}
// 返回结果
return r;

总结

对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过CompletableFuture来解决;而批量的并行任务,则可以通过CompletionService来解决。