Java并发编程入门(十九)异步任务调度工具CompleteFeature

6,370 阅读8分钟

banner窄.png

铿然架构  |  作者  /  铿然一叶 这是铿然架构的第 64 篇原创文章

相关阅读:

Java并发编程(一)知识地图
Java并发编程(二)原子性
Java并发编程(三)可见性
Java并发编程(四)有序性
Java并发编程(五)创建线程方式概览
Java并发编程入门(六)synchronized用法
Java并发编程入门(七)轻松理解wait和notify以及使用场景
Java并发编程入门(八)线程生命周期
Java并发编程入门(九)死锁和死锁定位
Java并发编程入门(十)锁优化
Java并发编程入门(十一)限流场景和Spring限流器实现
Java并发编程入门(十二)生产者和消费者模式-代码模板
Java并发编程入门(十三)读写锁和缓存模板
Java并发编程入门(十四)CountDownLatch应用场景
Java并发编程入门(十五)CyclicBarrier应用场景
Java并发编程入门(十六)秒懂线程池差别
Java并发编程入门(十七)一图掌握线程常用类和接口
Java并发编程入门(十八)再论线程安全
Java并发编程入门(二十)常见加锁场景和加锁工具


1. CompleteFeature简介

CompleteFeature是对Feature的增强,Feature只能处理简单的异步任务,而CompleteFeature可以将多个异步任务进行复杂的组合,支持串行执行,并行执行,And汇聚,Or汇聚,从而能对复杂的关联任务进行调度。

2. CompleteFeature支持的业务场景

2.1. 串行任务

串行任务指任务B要等待任务A执行完成之后才会执行,串行任务有如下属性:

属性描述
可获取A的结果任务B可获取任务A的执行结果作为参数使用
B有返回值如果任务B有返回值,可以将执行结果通过返回值返回
可获取A异常任务B可以获取任务A抛出的异常
A异常则终止当任务A抛出异常后,程序是否会终止,若会终止,程序将退出,任务B不会执行,否则程序不会退出,继续执行。

CompleteFeature支持的串行任务方法如下:

方法可获取A的结果B有返回值可获取A异常A异常则终止
thenRun
thenApply
thenAccept
thenCompose
whenComplete
exceptionally
handle

总结:

  1. 任务不会抛出异常就使用前四个方法,否则使用后三个方法。
  2. exceptionally相当于try {} catch {}的catch部分,whenComplete和handle相当于try {} catch {} finally {} 的catch和finall部分,区别是一个有返回值,一个没有返回值。
  3. thenApply和thenCompose的区别是,thenCompose在任务B中返回的是CompletableFuture,可参考后面的例子对比差异。

1.2. And汇聚关系

And汇聚关系是指:任务C要等待任务A或任务B都执行完后才执行。CompleteFeature支持此关系的方法如下:

方法C接收A或B返回值作为参数C有返回值
thenCombine
thenAcceptBoth
runAfterBoth

1.3. Or汇聚关系

Or汇聚关系是指:任务C等待任务A或任务B其中一个执行完后就执行,即C只需等待最先执行完成的任务后就可执行。CompleteFeature支持此关系的方法如下:

方法C接收A或B返回值作为参数C有返回值
applyToEither
acceptEither
runAfterEither

1.4. 多任务

CompletableFuture提供了两个多任务的方法:

方法描述
anyOf多个任务中的任意一个任务执行完则结束,可以获取到最先执行完的任务的返回值。
allOf多个任务都执行完后才结束,不能获取到任何一个任务的返回值

以上所有方法的返回值都是CompletableFuture,这样就可以继续调用前面描述的方法来进行任务组合,组合出更加复杂的任务处理流程。

1.5. 方法族

以上方法中的最后一个任务都是和前面的任务在一个线程内执行,CompletableFuture中还有一套方法让最后一个任务在新线程中执行,只要在原方法上加上Async后缀则可,例如:

同步异步
thenApplythenApplyAsync
thenAcceptthenAcceptAsync
thenRunthenRunAsync
thenComposethenComposeAsync
具体还有哪些,可参考源码。

2. 代码例子

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompleteFeatureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        simpleTask();
        serialTask();
        andTask();
        orTask();
        complexTask();

        sleep(2000); // 等待子线程结束
        System.out.println("end.");

    }

    private static void simpleTask() throws ExecutionException, InterruptedException {
        // 1. runAsync 执行一个异步任务,没有返回值
        CompletableFuture.runAsync(()-> System.out.println("1. runAsync"));
        sleep(100);

        // 2. supplyAsync 执行一个异步任务,有返回值
        CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
            System.out.println("2.1 supplyAsync task be called");
            sleep(100);
            return "2.2 supplyAsync return value";
        });
        System.out.println("2.3 after supplyAsync");
        System.out.println(future.get());
        sleep(200);
    }

    private static void serialTask() throws ExecutionException, InterruptedException {
        // 3. thenRun
        CompletableFuture.supplyAsync(()->{
            System.out.println("3.1 supplyAsync begin");
            sleep(100);  // 用于证明B等待A结束才会执行
            return "3.2 supplyAsync end";
        }).thenRun(()->{
            System.out.println("3.3 thenRun be called.");
        });
        sleep(200);

        // 4. thenApply
        CompletableFuture<String> future4 = CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "4.1 apple";
        }).thenApply(returnVal->{
            return "4.2 " + returnVal + "-苹果";
        });
        System.out.println("4.3 get: " + future4.get());
        sleep(100);

        // 5. thenAccept
        CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "5.1 orange";
        }).thenAccept(returnVal->{
            System.out.println("5.2 " + returnVal + "-桔子");
        });
        sleep(100);

        // 6. thenCompose
        CompletableFuture<String> future6 = CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "6.1 apple";
        }).thenCompose((returnVal)->{
            return CompletableFuture.supplyAsync(()->{
                return "6.2 " + returnVal;
            });
        });
        System.out.println("6.3 get: " + future6.get());
        sleep(100);

        // 7. whenComplete
        CompletableFuture.supplyAsync(()->{
            sleep(100);
            if (true) {  //修改boolean值观察不同结果
                return "7.1 return value for whenComplete";
            } else {
                throw new RuntimeException("7.2 throw exception for whenComplete");
            }
        }).whenComplete((returnVal, throwable)->{
            System.out.println("7.2 returnVal: " + returnVal);  // 可以直接拿到返回值,不需要通过future.get()得到
            System.out.println("7.3 throwable: " + throwable);  // 异步任务抛出异常,并不会因为异常终止,而是会走到这里,后面的代码还会继续执行
        });
        sleep(100);

        // 8. exceptionally
        CompletableFuture<String> future8 = CompletableFuture.supplyAsync(()->{
            sleep(100);
            if (false) {  //修改boolean值观察不同结果
                return "8.1 return value for exceptionally";
            } else {
                throw new RuntimeException("8.2 throw exception for exceptionally");
            }
        }).exceptionally(throwable -> {
            throwable.printStackTrace();
            return "8.3 return value after dealing exception.";
        });
        System.out.println("8.4 get: " + future8.get());
        sleep(100);

        // 9. handle
        CompletableFuture<String> future9 = CompletableFuture.supplyAsync(()->{
            sleep(100);
            if (false) {  //修改boolean值观察不同结果
                return "9.1 return value for handle";
            } else {
                throw new RuntimeException("9.2 throw exception for handle");
            }
        }).handle((retuanVal, throwable)->{
            System.out.println("9.3 retuanVal: " + retuanVal);
            System.out.println("9.4 throwable: " + throwable);
            return "9.5 new return value.";
        });
        System.out.println("9.6 get: " + future9.get());
        sleep(100);
    }

    private static void andTask() throws ExecutionException, InterruptedException {
        // 10. thenCombine 合并结果
        CompletableFuture<String> future10 = CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "10.1 TaskA return value";
        }).thenCombine(CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "10.2 TaskB return value";
        }), (taskAReturnVal, taskBReturnVal) -> taskAReturnVal + taskBReturnVal);
        System.out.println("10.3 get: " + future10.get());
        sleep(200);

        // 11. thenAcceptBoth
        CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "11.1 TaskA return value";
        }).thenAcceptBoth(CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "11.2 TaskB return value";
        }), (taskAReturnVal, taskBReturnVal) -> System.out.println(taskAReturnVal + taskBReturnVal));
        sleep(200);

        // 12. runAfterBoth A,B都执行完后才执行C,C不关心前面任务的返回值
        CompletableFuture.supplyAsync(()->{
            sleep(200);  // 虽然这个任务先执行,但是执行时间比下面的任务长,所以最后会使用下面的返回结果
            System.out.println("12.1 TaskA be called.");
            return "12.2 TaskA return value";
        }).runAfterBoth(CompletableFuture.supplyAsync(()->{
            sleep(100);
            System.out.println("12.3 TaskB be called.");
            return "12.4 TaskB return value";
        }), () -> System.out.println("12.5 TaskC be called."));
        sleep(300);
    }

    private static void orTask() throws ExecutionException, InterruptedException {
        // 13. applyToEither 使用A,B两个异步任务优先返回的结果
        CompletableFuture<String> future13 = CompletableFuture.supplyAsync(()->{
            sleep(200);  // 虽然这个任务先执行,但是执行时间比下面的任务长,所以最后会使用下面的返回结果
            System.out.println("13.1 TaskA be called"); // 用于证明拿到B的结果后,A还会继续执行,并不会终止
            return "13.2 TaskA return value";
        }).applyToEither(CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "13.3 TaskB return value";
        }), (returnVal) -> returnVal);
        System.out.println("13.4 get: " + future13.get());
        sleep(300);

        // 14. acceptEither 使用A,B两个异步任务优先返回的结果
        CompletableFuture.supplyAsync(()->{
            sleep(200);  // 虽然这个任务先执行,但是执行时间比下面的任务长,所以最后会使用下面的返回结果
            return "14.1 TaskA return value";
        }).acceptEither(CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "14.2 TaskB return value";
        }), (returnVal) -> System.out.println(returnVal));
        sleep(300);

        // 15. runAfterEither A,B任意一个执行完后就执行C,C不关心前面任务的返回值
        CompletableFuture.supplyAsync(()->{
            sleep(200);  // 虽然这个任务先执行,但是执行时间比下面的任务长,所以最后会使用下面的返回结果
            System.out.println("15.1 TaskA be called.");
            return "15.2 TaskA return value";
        }).runAfterEither(CompletableFuture.supplyAsync(()->{
            sleep(100);
            System.out.println("15.3 TaskB be called.");
            return "15.4 TaskB return value";
        }), () -> System.out.println("15.5 TaskC be called."));
        sleep(300);
    }

    private static void complexTask() throws ExecutionException, InterruptedException {
        // 16. anyOf
        CompletableFuture future16 = CompletableFuture.anyOf(CompletableFuture.supplyAsync(()->
        {
            sleep(300);
            System.out.println("16.1 TaskA be called.");
            return "16.2 TaskA return value.";
        }), CompletableFuture.supplyAsync(()->{
            sleep(100);
            System.out.println("16.3 TaskB be called.");
            return "16.4 TaskB return value.";
        }));
        System.out.println("16.5 get: " + future16.get());
        sleep(400);

        // 17. allOf
        CompletableFuture<Void> future17 = CompletableFuture.allOf(CompletableFuture.supplyAsync(()->
        {
            sleep(300);
            System.out.println("17.1 TaskA be called.");
            return "17.2 TaskA return value.";
        }), CompletableFuture.supplyAsync(()->{
            sleep(100);
            System.out.println("17.3 TaskB be called.");
            return "17.4 TaskB return value.";
        }));
        System.out.println("17.5 get: " + future17.get()); // allOf没有返回值
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
    }
}

输出日志:

1. runAsync
2.3 after supplyAsync
2.1 supplyAsync task be called
2.2 supplyAsync return value
3.1 supplyAsync begin
3.4
3.5 xxx
3.6 AAA
3.3 thenRun be called.
4.3 get: 4.2 4.1 apple-苹果
5.2 5.1 orange-桔子
6.3 get: 6.2 6.1 apple
7.2 returnVal: 7.1 return value for whenComplete
7.3 throwable: null
java.util.concurrent.CompletionException: java.lang.RuntimeException: 8.2 throw exception for exceptionally
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1584)
	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1574)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: 8.2 throw exception for exceptionally
	at com.javageektour.hikaricp.demo.CompleteFeatureDemo.lambda$serialTask$14(CompleteFeatureDemo.java:101)
	at com.javageektour.hikaricp.demo.CompleteFeatureDemo?Lambda$14/769287236.get(Unknown Source)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1582)
	... 5 more
8.4 get: 8.3 return value after dealing exception.
9.3 retuanVal: null
9.4 throwable: java.util.concurrent.CompletionException: java.lang.RuntimeException: 9.2 throw exception for handle
9.6 get: 9.5 new return value.
10.3 get: 10.1 TaskA return value10.2 TaskB return value
11.1 TaskA return value11.2 TaskB return value
12.3 TaskB be called.
12.1 TaskA be called.
12.5 TaskC be called.
13.4 get: 13.3 TaskB return value
13.1 TaskA be called
14.2 TaskB return value
15.3 TaskB be called.
15.5 TaskC be called.
15.1 TaskA be called.
16.3 TaskB be called.
16.5 get: 16.4 TaskB return value.
16.1 TaskA be called.
17.3 TaskB be called.
17.1 TaskA be called.
17.5 get: null
end.

3. 总结

CompleteFeature支持复杂的异步任务调度,支持多个任务串行,并行,汇聚,当多个异步任务有依赖关系时,通过CompleteFeature来调度任务可以大大简化代码和提高执行性能。

end.


<--阅过留痕,左边点赞!