引言
Java异步编程极大的节省了主程序执行时间,提升了计算资源利用效率,是Java高级工程师的必备技能之一。本文围绕什么是异步,异步解决了什么问题,怎么异步编程来展开。
什么是异步
在解释异步编程之前,我们先来看同步编程的定义。同步编程,即是一种典型的请求-响应模型,当请求调用一个函数或方法后,需等待其响应返回,然后执行后续代码。同步的最大特征便是「有序」,当各个过程都执行完毕,最后返回结果。如图
异步编程则是只发送了调用的指令,调用者无需等待被调用的方法执行完毕,而是继续执行下面的流程。在一个多处理器或多核的环境中,异步调用是真正的并行执行。如图
异步解决了什么问题
Java异步编程的目的是充分利用计算机CPU资源,不让主程序阻塞在某个长时间运行的任务上,从而优化主程序的执行时间。这类耗时的任务可以是 IO操作、远程调用以及高密度计算任务。
如果不使用多线程异步编程,我们的系统就会阻塞在耗时的子任务上,会导致极大延长完成主函数任务的时间。Java以及提供了丰富的API,来完成多线程异步编程。从NIO、Future,CompletableFuture、Fork/Join以及parrallelStream。另外google的guava框架提供了ListenableFuture和Spring的@Async来简化异步编程。
怎么异步编程
本文会使用最常用的 Spring @Async说明异步编程。
@Async异步调用
- @Async也是通过AOP(切面)实现的,与@Transactional相同
- 添加@Async注释的方法必须是public。因为AOP的本质是动态代理,动态代理要求方法必须是public
- @Async必须是跨类调用,原因也是同类直接调用无法被动态代理
- 需要添加@EnableAsync注解
@Async异步调用分为两种,一种是无返回值调用,一种是有返回值调用。我们先来看最简单的无返回值调用。
TestAsyncService 调用 AsyncService中的 testAsyncSimple函数
public class TestAsyncService {
private final AsyncService asyncService;
public TestAsyncService(AsyncService asyncService) {
this.asyncService = asyncService;
}
public void testAsyncSimple() throws InterruptedException {
System.out.println("-----start-----");
asyncService.testAsyncSimple();
System.out.println("-----end-----");
}
}
@Async
public void testAsyncSimple() throws InterruptedException {
Thread.sleep(1000);
System.out.println("async success");
}
单元测试
@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
class TestAsyncServiceTest {
@Autowired
private TestAsyncService testAsyncService;
@Test
void testAsyncSimple() throws InterruptedException {
testAsyncService.testAsyncSimple();
Thread.sleep(5000); //阻塞测试用例,等待异步调用结束
}
}
测试结果, end被打印出来说明主程序已经跑完。而 end在 async success之前打印出来,说明 testAsyncSimple函数确实是异步完成。
-----start-----
-----end-----
-----async success-----
有返回值调用需要用到Future
类。简单地说,Future类表示异步计算的未来结果 - 这个结果最终将在处理完成后出现在Future中。换句话说,Future表示在未来某个时间点获取执行结果,返回数据类型可以自定义。
我们先定义一个异步函数,它在3秒后将返回一个「hello world」字符串。
@Async
public Future<String> testAsyncWithResult() throws InterruptedException {
Thread.sleep(3000);
return new AsyncResult<String>("hello world !!!!");
}
在测试函数中定义 Future<String>
接受未来某个时间点返回的「hello world」。while (true)
阻塞线程等待Future
返回,future.isDone()
确认异步线程是否结束,如果结束,通过future.get()
获取返回值。当然,你也有可能会获得一个exception
异常。
public String testAsyncWithResult() throws ExecutionException, InterruptedException {
Future<String> future = asyncService.testAsyncWithResult();
while (true) {
if (future.isDone()) {
System.out.println("Result from asynchronous process - " + future.get());
return future.get();
}
System.out.println("Continue doing something else. ");
}
}
@Async线程池
默认情况下,Spring 使用 SimpleAsyncTaskExecutor
初始化线程池。每次执行客户提交给它的任务时,它会启动新的线程,并允许开发者控制并发线程的上限(concurrencyLimit),从而起到一定的资源节流作用。默认时,concurrencyLimit取值为-1,即不启用资源节流。
@Async
public void testAsyncSimple() {}
@Async也支持使用指定线程池,你可以指定线程池的各项参数,如下。
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean(ConfigConstant.TEST_EXECUTOR)
@Primary
public TaskExecutor testTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
initTaskExecutor(ConfigConstant.TEST_EXECUTOR, executor);
return executor;
}
private ThreadPoolTaskExecutor initTaskExecutor(String name, ThreadPoolTaskExecutor executor) {
executor.setThreadNamePrefix(name + "-task-");
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(1000);
/* 60 seconds */
executor.setKeepAliveSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
@Async使用指定线程池示例
@Async(ConfigConstant.TEST_EXECUTOR)
public void testAsyncWithExecutor() {}
@Async事务
- @Transactional 调用具有 @Async 的子函数,事务不生效
- @Async 调用具有 @Transactional 的子函数,事务生效
先说第一点,如果@Transactional先于@Async切面执行,但由于spring事务管理依赖的是ThreadLocal
,所以在开启的异步线程里面感知不到事务,说细点就是在Spring开启事务之后,会设置一个连接到当前线程,但这个时候又开启了一个新线程,执行实际的SQL代码时,通过ThreadLocal获取不到连接就会开启新连接,也不会设置autoCommit
,所以这个函数整体将没有事务。
这样第二点也就很容易理解,@Async先执行,@Transactional使用的ThreadLocal依然是@Async开启的新线程中的ThreadLocal,所以事务生效。
@Async异常处理
异常处理分为无返回值和有返回值两种。
- 无返回值通过实现
AsyncConfigurer
接口来处理异常 - 有返回值通过
Future.get()
返回异常,通过正常的try...catch捕获异常即可
先来看无返回值的情况,定义异常捕获配置类AsyncExceptionConfig
,配置类里面定义SpringAsyncExceptionHandler 方法实现AsyncUncaughtExceptionHandler
接口。
@Configuration
public class AsyncExceptionConfig implements AsyncConfigurer {
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SpringAsyncExceptionHandler();
}
class SpringAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
System.out.println("------我是Async无返回方法的异常处理方法---------");
}
}
}
我们在异步代码中抛出RuntimeException()测试
@Async
public void testAsyncWithException() throws InterruptedException {
Thread.sleep(1000);
throw new RuntimeException();
}
测试结果
-----start-----
-----end-----
------我是Async无返回方法的异常处理方法---------
有返回值通过Future.get()返回异常,TestService调用上面的异常代码
public void testAsyncWithException() throws ExecutionException, InterruptedException {
System.out.println("start");
Future<String> future = asyncService.testAsyncWithException();
while (true) {
if (future.isDone()) {
System.out.println("Result from asynchronous process - " + future.get());
System.out.println("end");
}
System.out.println("Continue doing something else. ");
}
}
测试结果,异常抛出,通过正常的try...catch捕获处理即可。
Continue doing something else.
Continue doing something else.
Continue doing something else.
Continue doing something else.
java.util.concurrent.ExecutionException: java.lang.RuntimeException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.example.springbootexample.service.AsyncService.TestAsyncService.testAsyncWithException(TestAsyncService.java:54)
at com.example.springbootexample.service.AsyncService.TestAsyncServiceTest.testAsyncWithException(TestAsyncServiceTest.java:50)
总结
Java异步编程极大的节省了主程序执行时间,不让主程序阻塞在某个长时间运行的任务上,从而优化主程序的执行时间。本文从最常用的Spring @Async来介绍Java异步编程。
异步函数必须是public,被跨类调用才能生效。调用分为无返回值与有返回值两种情况。有返回值通过Future类来接受返回值。通过Future.isDone()来确认异步线程是否结束。
异步线程池可以使用默认线程池,也可以初始化一个自定义线程池。
异步事务方面,@Async 调用具有 @Transactional 的子函数,事务生效。@Transactional 调用具有 @Async 的子函数,事务不生效。
异常处理方面,同样分为无返回值与有返回值两种情况。无返回值可以定义异常捕获配置类AsyncExceptionConfig处理异常。有返回值可以通过Future.get()抛出异常,随后try...catch正常捕获处理即可。
参考
从Transactional与Async注解说起:https://juejin.cn/post/6844903928799182861
本文使用 mdnice 排版