滥用线程,导致线上线程池被撑爆的一次意外

2,172

事故的背景

简单的描述一下业务场景,项目里面有一个分布式定时job ,定期去扒取数据,那么有两层循环,第一层是大概1000条数据 ,然后第二层循环每个数据下面大概20个子数据,然后通过多线程的方式去扒取数据入库。这就是一个简单的业务背景。这个对刚入行的小白写代码得注意了!

作为程序员的我也是第一次遇到这个问题,虽然这个问题解决很简单,但是造成的影响很大,我觉得还是有必要做一个小总结,小伙伴们以后写类似代码的时候就会有注意,这个问题的造成是我一个同事没有理解透线程池,导致的一个很大的问题。那么先说问题之前先扒拉扒拉线程池。

线程池

首先我先说明一点在企业中一般都是自定义线程池,很少使用jdk 给我们提供的几种线程池方法,这样是为了做到一个可控制。

这里帮大家只是一次简单的回顾吧,具体线程池网上已经一大片一大片的文章了。

  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) 
   
private static ThreadPoolExecutor poolExecutor = new
            ThreadPoolExecutor(3,
            30,
            1,
            TimeUnit.MINUTES,
            new ArrayBlockingQueue(1000),
            new MyRejectedExecutionHandler(MAX_QUEUE_SIZE));

那么说到线程池无非就理解这几个参数。

流程

  • 假如我们定义的corePoolSize 是5个 maximunPoolSize 是 30, keepAliveTime为一分钟 ,队列大概1000个吧
  • 第1个Task来了放入到队列里面,线程池一直从队列里面获取,获取到第一个Task,发现当前线程池里面线程个数小于corePoolSize ,欧克欧克,兄弟我马上给你创建一个线程。
  • 此时第2个Task 又来了,线程池又从队列里面获取,发现现在的线程还是小于corePoolSize,马上又开启新的线程继续执行
  • 此时第5个Task 来了,那么和corePoolSize 相等了,ok ,那么继续创建一个线程。对于创建的这5个线程是不会销毁的,这也是为什么线程池可以避免重复创建线程代来的消耗。
  • 此时第6个Task 来了,线程池发现corePoolSize 里面有刚刚执行完了的线程,那么此时就不需要创建新的线程了,直接拿来使用,达到了一个复用的效果。
  • 假如我们的Task 很多很多,又比较耗时,corePoolSize 已经没有空闲的线程了,那么接下来就是maximumPoolSize出场了,发现线程池里面此时corePoolSize < num < maximumPoolSize 那么继续创建线程执行Task
  • 那么当线程池数量num > maximumPoolSize,就不会创建新的线程,来的新的Task直接往队列里面仍进去
  • 当队列里面都已经超过了最大数量(我们这里定义的1000) ,相当我们线程池已经爆满了没法接收新的Task了,那么不好意思兄弟,我只能拒绝你了,然后就走我们的MyRejectedExecutionHandler
  • 上面的参数基本已经提到了,还剩一个keepAliveTime 那么这个时间又什么时候用呢,这种场景下corePoolSize < num < maximumPoolSize 超过corePoolSize 空闲的线程的存活时间,比如核心是5个,最大30个,那么多余的25个线程就是超出的线程,简单的总结就是:超出核心线程的线程 存活的时间。

那么上面也是整个线程池的核心流程做了一个描述。

自定义线程池

上面已经描述了线程池的流程和原理,下面自定义线程池直接贴代码了,就不做过多的阐述了。

// 定义一个线程池类
public class MyWorkerThreadPool {

    private static final int MAX_QUEUE_SIZE = 1000;

    private static ThreadPoolExecutor poolExecutor = new
            ThreadPoolExecutor(3,
            30,
            1,
            TimeUnit.MINUTES,
            new ArrayBlockingQueue(MAX_QUEUE_SIZE),
            new MyRejectedExecutionHandler(MAX_QUEUE_SIZE));

    public static void submitTak(Runnable run) {
        poolExecutor.submit(run);
    }

    public static void shutdown() {
        poolExecutor.shutdown();
    }
}

// 定义一个拒绝策略

public class MyRejectedExecutionHandler implements RejectedExecutionHandler{

    private final Log logger = LogFactory.getLog(this.getClass());

    private int  maxQueueSize;

    public MyRejectedExecutionHandler(int maxQueueSize){
        this.maxQueueSize=maxQueueSize;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("提交任务失败了" +
                "当前队列最大长度" +
                "maxQueueSize="+maxQueueSize+
                ",maximumPoolSize="+executor.getMaximumPoolSize());
        if(executor.getQueue().size()<maxQueueSize){
            executor.submit(r);
        }else{
            try {
                Thread.sleep(3000);
                executor.submit(r);
            }catch (Exception e){
                //此异常忽略
                executor.submit(r);
            }
        }
    }
}

事故代码(伪代码)

那么接下来就是重点了,这里只贴一部分伪代码,前面已经说了这是一个分布式定时job,这里是我精简了同事的代码提炼出来的。

 //模拟场景
        for(int i= 0;i< 1000;i++){
            for(int j = 0;j<20;j++){
                MyWorkerThreadPool.submitTak(()->{
                    // 真实业务场景这里非常耗时,
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
        


执行结果:
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30

这里 第一层模拟了1000条数据,第二层循环30条数据,同事的代码,导致同一时间我们自定义的线程队列爆满,2000*30 这个是要开多少个线程啊。细细想下是不是很恐怖,这么使用会导致什么后果呢,当前业务又很多都被拒绝掉没有执行,另外当线程池爆满后,我们项目的其它功能执行异步方法也会被拒绝掉,结果可想而知。

那么如何解决了,其实很简单,我们跑这个比较耗时的任务我们可以指定10个线程去跑就是了,这样就不会影响到其它的功能业务。 我这里简单的使用了一个计数器,比如超过了10个线程则阻塞等待一会,跑完一个线程则减一,直接上代码

public class TestMain {
    public static void main(String[] args) throws Exception{
        //模拟场景,这是一个分布式定时任务,所以不会存在并发同时执行的问题
        AtomicInteger threadNum = new AtomicInteger(0);
        // 模拟当前第一层有1000数据
        for(int i= 0;i< 1000;i++){
            // 模拟每条线路有20条子数据
            for(int j = 0;j<20;j++){
                // 多线程拉去爬取网上的数据汇总到数据库
                // 一次最多开启10个线程取执行耗时操作,
                while (threadNum.get() > 10){
                    // 可以小睡一会再看是否有资格执行
                    Thread.sleep(500);
                }
                // 小增加1
                threadNum.incrementAndGet();
                int tempI = i;
                int tempJ = j;
                MyWorkerThreadPool.submitTak(()->{
                    // 真实业务场景这里非常耗时,
                    try {
                        Thread.sleep(5000);
                         System.out.println(Thread.currentThread().getName()+"执行完第一层数据"+ tempI +"第二层:"+tempJ);
                        // 执行完减少一个1
                        threadNum.decrementAndGet();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }


执行结果:
pool-2-thread-2执行完第一层数据0第二层:1
pool-2-thread-1执行完第一层数据0第二层:0
pool-2-thread-3执行完第一层数据0第二层:2
pool-2-thread-2执行完第一层数据0第二层:3
pool-2-thread-3执行完第一层数据0第二层:5
pool-2-thread-1执行完第一层数据0第二层:4
pool-2-thread-3执行完第一层数据0第二层:7
pool-2-thread-1执行完第一层数据0第二层:8
pool-2-thread-2执行完第一层数据0第二层:6
pool-2-thread-1执行完第一层数据0第二层:10
pool-2-thread-3执行完第一层数据0第二层:9
pool-2-thread-2执行完第一层数据0第二层:11
pool-2-thread-3执行完第一层数据0第二层:13
pool-2-thread-2执行完第一层数据0第二层:14
pool-2-thread-1执行完第一层数据0第二层:12

总结

其实我们开发中又很多小细节,只是我们有时候没有注意或对其原理不清楚,有时候写出来的代码就会带来比较糟糕的后果。那么今天这篇就到这里!