Spring@Async异步线程的用法

1,419 阅读3分钟

最近在代码中使用@Async来进行异步抄单,进行压测时出现了OOM问题,通过日志信息看到了:unable to create new native thread

一、内存溢出的三种类型:

  • 1、第一种OutOfMemoryError:PermGen space,发生这种问题的原因是程序中使用了大量的jar或class
  • 2、第二种OutOfMemoryError:Java heap space ,发生这种问题的原因是Java虚拟机创建的对象太多
  • 3、第三种OutOfMemoryError:unable to create new native thread,创建线程太多,占用内存多大

二、问题分析

初步分析

初步怀疑是创建的线程太多导致的,使用jstack 线程PID 分析打印的日志,发现大量线程处于Runnable状态,基本可以确定是创建线程太多导致的。

代码分析

出问题的服务是报案的服务,在进行抄单时在方法上使用了@Asycn这个注解,进行异步抄单,通过trace-log日志可以看出创建了很多的Thread,经过简单的了解@Async异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,在压测的情况下,会有大量的请求去抄单,这时会不断创建大量的线程,极有可能压爆服务器内存。

借此机会学习一下SimpleAsyncTaskExecutor的源码,SimpleAsyncTaskExecutor提供了限流机制,通过concurrencyLimit属性来控制开关,当concurrencyLimit>=0时开启限流机制,默认关闭限流机制即concurrencyLimit=-1,当关闭情况下,会不断创建新的线程来处理任务,核心代码如下:

@Override
	public void execute(Runnable task, long startTimeout) {
		Assert.notNull(task, "Runnable must not be null");
		Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
		if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
			this.concurrencyThrottle.beforeAccess();
			doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
		}
		else {
			doExecute(taskToUse);
		}
	}

SimpleAsyncTaskExecutor限流实现:

首选任务进来,会循环判断当前执行线程数是否超过concurrencyLimit,如果超了,则当前线程调用wait方法,释放monitor对象锁,进入等待

protected void beforeAccess() {
		if (this.concurrencyLimit == NO_CONCURRENCY) {
			throw new IllegalStateException(
					"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
		}
		if (this.concurrencyLimit > 0) {
			boolean debug = logger.isDebugEnabled();
			synchronized (this.monitor) {
				boolean interrupted = false;
				while (this.concurrencyCount >= this.concurrencyLimit) {
					if (interrupted) {
						throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
								"but concurrency limit still does not allow for entering");
					}
					if (debug) {
						logger.debug("Concurrency count " + this.concurrencyCount +
								" has reached limit " + this.concurrencyLimit + " - blocking");
					}
					try {
						this.monitor.wait();
					}
					catch (InterruptedException ex) {
						// Re-interrupt current thread, to allow other threads to react.
						Thread.currentThread().interrupt();
						interrupted = true;
					}
				}
				if (debug) {
					logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
				}
				this.concurrencyCount++;
			}
		}
	}

线程任务执行完毕后,当执行线程数会减一,会调用monitor对象的notify方法,唤醒等待状态下的线程,等待状态下的线程会竞争monitor锁,竞争到,会继续执行线程任务。

protected void afterAccess() {
		if (this.concurrencyLimit >= 0) {
			synchronized (this.monitor) {
				this.concurrencyCount--;
				if (logger.isDebugEnabled()) {
					logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
				}
				this.monitor.notify();
			}
		}
	}

最终的解决办法:使用自定义线程池

@Configuration
@Slf4j
public class AppConfig implements AsyncConfigurer {
    public static final String
            ASYNC_EXECUTOR_NAME = "asyncExecutor";


    @Bean(name = ASYNC_EXECUTOR_NAME)
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        //核心线程数,默认值为1
        threadPoolTaskExecutor.setCorePoolSize(10);
        //最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(20);
        //缓存队列
        threadPoolTaskExecutor.setQueueCapacity(256);
        //超出核心线程数之外的线程在空闲时间最大的存活时间
        threadPoolTaskExecutor.setKeepAliveSeconds(60);
        threadPoolTaskExecutor.setTaskDecorator(new ContextDecorator());
        //线程的前缀
        threadPoolTaskExecutor.setThreadNamePrefix("AsyncThread-");
        //是否等待所有的线程关闭之后采取关闭线程池,默认是false
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        //等待时长
        threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
        //拒绝策略
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }


    /**
     * <h2>定义异步任务异常处理类</h2>
     *
     * @return
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler();
    }

    class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

        @Override
        public void handleUncaughtException(Throwable ex, Method method, Object... params) {
            log.info("AsycError:{},Method :{},Param:{}", ex.getMessage(), method.getName(), JSON.toJSON(params));
            ex.printStackTrace();
            //TODO 发送邮件或者发送短信
        }
    }
}

就不会出现一直创建Thread的情况,导致OOM。