最近在代码中使用@Async来进行异步抄单,进行压测时出现了OOM问题,通过日志信息看到了:unable to create new native thread
一、内存溢出的三种类型:
- 1、第一种OutOfMemoryError:PermGen space,发生这种问题的原因是程序中使用了大量的jar或class
- 2、第二种OutOfMemoryError:Java heap space ,发生这种问题的原因是Java虚拟机创建的对象太多
- 3、第三种OutOfMemoryError:unable to create new native thread,创建线程太多,占用内存多大
二、问题分析
初步分析
初步怀疑是创建的线程太多导致的,使用jstack 线程PID 分析打印的日志,发现大量线程处于Runnable状态,基本可以确定是创建线程太多导致的。
代码分析
出问题的服务是报案的服务,在进行抄单时在方法上使用了@Asycn这个注解,进行异步抄单,通过trace-log日志可以看出创建了很多的Thread,经过简单的了解@Async异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,在压测的情况下,会有大量的请求去抄单,这时会不断创建大量的线程,极有可能压爆服务器内存。
借此机会学习一下SimpleAsyncTaskExecutor的源码,SimpleAsyncTaskExecutor提供了限流机制,通过concurrencyLimit属性来控制开关,当concurrencyLimit>=0时开启限流机制,默认关闭限流机制即concurrencyLimit=-1,当关闭情况下,会不断创建新的线程来处理任务,核心代码如下:
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
else {
doExecute(taskToUse);
}
}
SimpleAsyncTaskExecutor限流实现:
首选任务进来,会循环判断当前执行线程数是否超过concurrencyLimit,如果超了,则当前线程调用wait方法,释放monitor对象锁,进入等待
protected void beforeAccess() {
if (this.concurrencyLimit == NO_CONCURRENCY) {
throw new IllegalStateException(
"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
}
if (this.concurrencyLimit > 0) {
boolean debug = logger.isDebugEnabled();
synchronized (this.monitor) {
boolean interrupted = false;
while (this.concurrencyCount >= this.concurrencyLimit) {
if (interrupted) {
throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
"but concurrency limit still does not allow for entering");
}
if (debug) {
logger.debug("Concurrency count " + this.concurrencyCount +
" has reached limit " + this.concurrencyLimit + " - blocking");
}
try {
this.monitor.wait();
}
catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
interrupted = true;
}
}
if (debug) {
logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
}
this.concurrencyCount++;
}
}
}
线程任务执行完毕后,当执行线程数会减一,会调用monitor对象的notify方法,唤醒等待状态下的线程,等待状态下的线程会竞争monitor锁,竞争到,会继续执行线程任务。
protected void afterAccess() {
if (this.concurrencyLimit >= 0) {
synchronized (this.monitor) {
this.concurrencyCount--;
if (logger.isDebugEnabled()) {
logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
}
this.monitor.notify();
}
}
}
最终的解决办法:使用自定义线程池
@Configuration
@Slf4j
public class AppConfig implements AsyncConfigurer {
public static final String
ASYNC_EXECUTOR_NAME = "asyncExecutor";
@Bean(name = ASYNC_EXECUTOR_NAME)
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//核心线程数,默认值为1
threadPoolTaskExecutor.setCorePoolSize(10);
//最大线程数
threadPoolTaskExecutor.setMaxPoolSize(20);
//缓存队列
threadPoolTaskExecutor.setQueueCapacity(256);
//超出核心线程数之外的线程在空闲时间最大的存活时间
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setTaskDecorator(new ContextDecorator());
//线程的前缀
threadPoolTaskExecutor.setThreadNamePrefix("AsyncThread-");
//是否等待所有的线程关闭之后采取关闭线程池,默认是false
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//等待时长
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
//拒绝策略
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
/**
* <h2>定义异步任务异常处理类</h2>
*
* @return
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncExceptionHandler();
}
class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.info("AsycError:{},Method :{},Param:{}", ex.getMessage(), method.getName(), JSON.toJSON(params));
ex.printStackTrace();
//TODO 发送邮件或者发送短信
}
}
}
就不会出现一直创建Thread的情况,导致OOM。