你还在写同步程序?Java异步编程了解一下

3,711

引言

Java异步编程极大的节省了主程序执行时间,提升了计算资源利用效率,是Java高级工程师的必备技能之一。本文围绕什么是异步,异步解决了什么问题,怎么异步编程来展开。

什么是异步

在解释异步编程之前,我们先来看同步编程的定义。同步编程,即是一种典型的请求-响应模型,当请求调用一个函数或方法后,需等待其响应返回,然后执行后续代码。同步的最大特征便是「有序」,当各个过程都执行完毕,最后返回结果。如图

异步编程则是只发送了调用的指令,调用者无需等待被调用的方法执行完毕,而是继续执行下面的流程。在一个多处理器或多核的环境中,异步调用是真正的并行执行。如图

异步解决了什么问题

Java异步编程的目的是充分利用计算机CPU资源,不让主程序阻塞在某个长时间运行的任务上,从而优化主程序的执行时间。这类耗时的任务可以是 IO操作、远程调用以及高密度计算任务。

如果不使用多线程异步编程,我们的系统就会阻塞在耗时的子任务上,会导致极大延长完成主函数任务的时间。Java以及提供了丰富的API,来完成多线程异步编程。从NIO、Future,CompletableFuture、Fork/Join以及parrallelStream。另外google的guava框架提供了ListenableFuture和Spring的@Async来简化异步编程。

怎么异步编程

本文会使用最常用的 Spring @Async说明异步编程。

@Async异步调用

  • @Async也是通过AOP(切面)实现的,与@Transactional相同
  • 添加@Async注释的方法必须是public。因为AOP的本质是动态代理,动态代理要求方法必须是public
  • @Async必须是跨类调用,原因也是同类直接调用无法被动态代理
  • 需要添加@EnableAsync注解

@Async异步调用分为两种,一种是无返回值调用,一种是有返回值调用。我们先来看最简单的无返回值调用。

TestAsyncService 调用 AsyncService中的 testAsyncSimple函数

public class TestAsyncService {
    private final AsyncService asyncService;

    public TestAsyncService(AsyncService asyncService) {
        this.asyncService = asyncService;
    }

    public void testAsyncSimple() throws InterruptedException {
        System.out.println("-----start-----");
        asyncService.testAsyncSimple();
        System.out.println("-----end-----");
    }
}
    @Async
    public void testAsyncSimple() throws InterruptedException {
        Thread.sleep(1000);
        System.out.println("async success");
    }

单元测试

@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
class TestAsyncServiceTest {

    @Autowired
    private TestAsyncService testAsyncService;

    @Test
    void testAsyncSimple() throws InterruptedException {
        testAsyncService.testAsyncSimple();

        Thread.sleep(5000); //阻塞测试用例,等待异步调用结束
    }
}

测试结果, end被打印出来说明主程序已经跑完。而 end在 async success之前打印出来,说明 testAsyncSimple函数确实是异步完成。

-----start-----
-----end-----
-----async success-----

有返回值调用需要用到Future类。简单地说,Future类表示异步计算的未来结果 - 这个结果最终将在处理完成后出现在Future中。换句话说,Future表示在未来某个时间点获取执行结果,返回数据类型可以自定义。

我们先定义一个异步函数,它在3秒后将返回一个「hello world」字符串。

    @Async
    public Future<String> testAsyncWithResult() throws InterruptedException {
        Thread.sleep(3000);
        return new AsyncResult<String>("hello world !!!!");
    }

在测试函数中定义 Future<String> 接受未来某个时间点返回的「hello world」。while (true)阻塞线程等待Future返回,future.isDone()确认异步线程是否结束,如果结束,通过future.get()获取返回值。当然,你也有可能会获得一个exception异常。

public String testAsyncWithResult() throws ExecutionException, InterruptedException {
        Future<String> future = asyncService.testAsyncWithResult();
        while (true) {
            if (future.isDone()) {
                System.out.println("Result from asynchronous process - " + future.get());
                return future.get();
            }
            System.out.println("Continue doing something else. ");
        }
    }

@Async线程池

默认情况下,Spring 使用 SimpleAsyncTaskExecutor初始化线程池。每次执行客户提交给它的任务时,它会启动新的线程,并允许开发者控制并发线程的上限(concurrencyLimit),从而起到一定的资源节流作用。默认时,concurrencyLimit取值为-1,即不启用资源节流。

    @Async
    public void testAsyncSimple() {}

@Async也支持使用指定线程池,你可以指定线程池的各项参数,如下。

@Configuration
@EnableAsync
public class ThreadPoolConfig {

    @Bean(ConfigConstant.TEST_EXECUTOR)
    @Primary
    public TaskExecutor testTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        initTaskExecutor(ConfigConstant.TEST_EXECUTOR, executor);
        return executor;
    }

    private ThreadPoolTaskExecutor initTaskExecutor(String name, ThreadPoolTaskExecutor executor) {
        executor.setThreadNamePrefix(name + "-task-");
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(1000);
        /* 60 seconds */
        executor.setKeepAliveSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
}

@Async使用指定线程池示例

    @Async(ConfigConstant.TEST_EXECUTOR)
    public void testAsyncWithExecutor() {}

@Async事务

  • @Transactional 调用具有 @Async 的子函数,事务不生效
  • @Async 调用具有 @Transactional 的子函数,事务生效

先说第一点,如果@Transactional先于@Async切面执行,但由于spring事务管理依赖的是ThreadLocal,所以在开启的异步线程里面感知不到事务,说细点就是在Spring开启事务之后,会设置一个连接到当前线程,但这个时候又开启了一个新线程,执行实际的SQL代码时,通过ThreadLocal获取不到连接就会开启新连接,也不会设置autoCommit,所以这个函数整体将没有事务。

这样第二点也就很容易理解,@Async先执行,@Transactional使用的ThreadLocal依然是@Async开启的新线程中的ThreadLocal,所以事务生效。

@Async异常处理

异常处理分为无返回值和有返回值两种。

  • 无返回值通过实现AsyncConfigurer接口来处理异常
  • 有返回值通过Future.get()返回异常,通过正常的try...catch捕获异常即可

先来看无返回值的情况,定义异常捕获配置类AsyncExceptionConfig,配置类里面定义SpringAsyncExceptionHandler 方法实现AsyncUncaughtExceptionHandler 接口。

@Configuration
public class AsyncExceptionConfig implements AsyncConfigurer {

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SpringAsyncExceptionHandler();
    }

    class SpringAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
        @Override
        public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
            System.out.println("------我是Async无返回方法的异常处理方法---------");
        }
    }
}

我们在异步代码中抛出RuntimeException()测试

    @Async
    public void testAsyncWithException() throws InterruptedException {
        Thread.sleep(1000);
        throw new RuntimeException();
    }

测试结果

-----start-----
-----end-----
------我是Async无返回方法的异常处理方法---------

有返回值通过Future.get()返回异常,TestService调用上面的异常代码

public void testAsyncWithException() throws ExecutionException, InterruptedException {
        System.out.println("start");
        Future<String> future = asyncService.testAsyncWithException();
        while (true) {
            if (future.isDone()) {
                System.out.println("Result from asynchronous process - " + future.get());
                System.out.println("end");
            }
            System.out.println("Continue doing something else. ");
        }
    }

测试结果,异常抛出,通过正常的try...catch捕获处理即可。

Continue doing something else. 
Continue doing something else. 
Continue doing something else. 
Continue doing something else. 

java.util.concurrent.ExecutionException: java.lang.RuntimeException

 at java.util.concurrent.FutureTask.report(FutureTask.java:122)
 at java.util.concurrent.FutureTask.get(FutureTask.java:192)
 at com.example.springbootexample.service.AsyncService.TestAsyncService.testAsyncWithException(TestAsyncService.java:54)
 at com.example.springbootexample.service.AsyncService.TestAsyncServiceTest.testAsyncWithException(TestAsyncServiceTest.java:50)

总结

Java异步编程极大的节省了主程序执行时间,不让主程序阻塞在某个长时间运行的任务上,从而优化主程序的执行时间。本文从最常用的Spring @Async来介绍Java异步编程。

异步函数必须是public,被跨类调用才能生效。调用分为无返回值与有返回值两种情况。有返回值通过Future类来接受返回值。通过Future.isDone()来确认异步线程是否结束。

异步线程池可以使用默认线程池,也可以初始化一个自定义线程池。

异步事务方面,@Async 调用具有 @Transactional 的子函数,事务生效。@Transactional 调用具有 @Async 的子函数,事务不生效。

异常处理方面,同样分为无返回值与有返回值两种情况。无返回值可以定义异常捕获配置类AsyncExceptionConfig处理异常。有返回值可以通过Future.get()抛出异常,随后try...catch正常捕获处理即可。

参考

从Transactional与Async注解说起:https://juejin.cn/post/6844903928799182861

本文使用 mdnice 排版