关于线程池,这些你可能不知道:构建属于自己的强大线程池

1,037 阅读9分钟

前言

Java中的线程池功能已经比较全面了,但是在某些业务场景下还存在一些问题,思考一下,如果你有一个任务处理系统,所有任务都通过线程池处理,实际在处理问题时可能会有频繁的CPU运算、网络IO处理,正常情况下1分钟就应该处理完成的任务,如果实际上花了5分钟甚至更多时间应该怎么办呢,这种情况可能是由于代码问题、RPC调用超时、数据异常等原因。在这种业务场景下Java自带的线程池似乎不足以满足我们的需求,我们需求一种可以定制化超时时间,并且超时自动中断并且重跑任务的线程池。

怎么做

我们的需求:

  • 判断出超时的线程进行中断
  • 中断之后继续提交超时线程 首先我们需要解决如何中断线程的问题,基础比较好的同学可能知道future中有一个cancel方法可以中断线程。 基本的思路有点类似看门狗的思想,给每一个线程赋予一个ID,每个ID里面有线程的提交时间,线程的future对象和runnable对象,异步启动一个监控线程,每一秒自动执行一次,判断每一个线程ID对应的提交时间是否超时,如果超时就调用future.cancel进行取消,并重新submit给线程池。

实现


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

//我们实现自己的中断自动重跑的线程池
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
  //定义监控线程
  private static final ScheduledExecutorService monitorExecutor = Executors.newScheduledThreadPool(1);
  //存储线程ID和它对应的起始执行时间
  private final ConcurrentMap<Integer, Long> aliveThreadRefreshTimeMap = new ConcurrentHashMap<>();
  //存储线程ID和它对应的Future对象
  private final ConcurrentMap<Integer, Future<?>> aliveThreadFutureMap = new ConcurrentHashMap<>();
  //存储线程ID和它对应的Runnable对象
  private final ConcurrentMap<Integer, Runnable> aliveThreadTaskMap = new ConcurrentHashMap<>();
  //线程ID
  private AtomicInteger aliveThreadNum = new AtomicInteger(0);
  //自动重试时间
  private long retryTime;

  private boolean isMonitoring = false;
  //构造时传入重试时间
  public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
      BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long retryTime) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    this.retryTime = retryTime;
  }

  //重写:在提交线程池后记录线程信息
  @Override
  public Future<?> submit(Runnable task) {
    if (task == null) {
      throw new NullPointerException();
    }
    Future<?> future = super.submit(task);
    afterSubmit(future, task);
    return future;
  }

  //每次提交任务后将线程信息加入到对应的Map里,并且首次提交时启动monitor线程
  private void afterSubmit(Future future, Runnable task) {
    aliveThreadFutureMap.put(aliveThreadNum.get(), future);
    aliveThreadTaskMap.put(aliveThreadNum.get(), task);
    long currentTime = System.currentTimeMillis();
    aliveThreadRefreshTimeMap.put(aliveThreadNum.get(), currentTime);
    initializeMonitorThread();
    aliveThreadNum.incrementAndGet();
  }

  //启动一个看门狗线程
  private void initializeMonitorThread() {
    //看门狗线程只需启动一次
    if (isMonitoring) {
      return;
    }
    monitorExecutor.scheduleAtFixedRate(() -> {
      isMonitoring = true;
      System.out.printf("monitor thread start..., aliveThreadRefreshTimeMap:%s\n", aliveThreadRefreshTimeMap);
      List<Integer> removeIdList = new ArrayList<>();
      //遍历储存线程开始时间的那个Map
      for (int threadId : aliveThreadRefreshTimeMap.keySet()) {
        long currentTime = System.currentTimeMillis();
        long refreshTimes = currentTime - aliveThreadRefreshTimeMap.get(threadId);
        System.out.printf("thread %d, refreshTimes is %d\n", threadId, refreshTimes);
        //判断时间大小,注意这里并不意味着超时,因为我们在线程提交时将它的时间加入map,但线程可能很快就结束了,map里的值在结束时不会更新
        if (refreshTimes > retryTime) {
          System.out.printf("alive thread %d: is %dms to refresh, will restart\n", threadId,
              currentTime - aliveThreadRefreshTimeMap.get(threadId));
          Future future = aliveThreadFutureMap.get(threadId);
          future.cancel(true);
          //isCanceled返回true说明超时线程已被中断成功,返回false说明线程早已经跑完了,没有超时
          //注意这里用了isCanceled并没有用cancel,是因为如果一个线程已经被取消后,需要resubmit时可能提交失败,我们需要支持对于这种情况让它可以重复尝试提交
          if(!future.isCancelled()){
            System.out.println("canceled failed with thread id : "+threadId+" the thread may already stopped");
            removeIdList.add(threadId);
            continue;
          }
          //超时线程重新提交
          Future<?> resubmitFuture;
          try {
            resubmitFuture = super.submit(aliveThreadTaskMap.get(threadId));
          } catch (Exception e) {
            //注意这里可能提交失败,因为可能线程池被占满,走拒绝策略了。。我们这里的处理方式是直接跳过,这样它还会再下一次monitor线程执行时被重试
            System.out.println("error when resubmit , the threadPool may be full will retry with  error info: " + e.getMessage());
            continue;
          }
          //resubmit成功后登记信息
          aliveThreadNum.incrementAndGet();
          currentTime = System.currentTimeMillis();
          aliveThreadRefreshTimeMap.put(aliveThreadNum.get(), currentTime);
          aliveThreadFutureMap.put(aliveThreadNum.get(), resubmitFuture);
          aliveThreadTaskMap.put(aliveThreadNum.get(), aliveThreadTaskMap.get(threadId));
          removeIdList.add(threadId);
          System.out.printf("restart success, thread id is:%d\n", aliveThreadNum.get());
        }
      }
      for (int id : removeIdList) {
        //清理不需要的已经结束的线程
        aliveThreadFutureMap.remove(id);
        aliveThreadTaskMap.remove(id);
        aliveThreadRefreshTimeMap.remove(id);
      }
    //定义每一秒执行一次的定时线程
    }, 0, 1, TimeUnit.SECONDS);
  }

}

我们定制化了自己的线程池,那么如何使用它呢?

 @Test
  public void test1(){
    //创建线程池,线程池core和max都相同
    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
    BasicThreadFactory.Builder builder = new BasicThreadFactory.Builder().namingPattern("processor-" + "-thread-%s");
    MyThreadPoolExecutor executorPool = new MyThreadPoolExecutor(
        10,
        10, 30L, TimeUnit.SECONDS, workQueue, builder.build(),2500L);

    executorPool.submit(()->{
      try {
        System.out.println(Thread.currentThread().getName()+" start sleep 1 ms....");
        Thread.sleep(1);
      }catch (InterruptedException e){
        System.out.println(Thread.currentThread().getName()+e.getMessage());
      }
    });
    executorPool.submit(()->{
      try {
        System.out.println(Thread.currentThread().getName()+" start sleep 30 s....");
        Thread.sleep(30000);
      }catch (InterruptedException e){
        System.out.println(Thread.currentThread().getName()+e.getMessage());
      }
    });

    while (true);
  }

因为我们已经包装好了,我们只需要使用apache的builder构造一个线程池,传入超时时间,2500ms,启动两个任务,其中一个任务1ms就结束,另外一个任务要执行30s。我们预期是第一个任务执行一次就完成了,第二个任务每次都会超时中断重跑:

processor--thread-1 start sleep 1 ms....
monitor thread start..., aliveThreadRefreshTimeMap:{0=1546314267046}
processor--thread-2 start sleep 30 s....
thread 0, refreshTimes is 2
thread 1, refreshTimes is 0
monitor thread start..., aliveThreadRefreshTimeMap:{0=1546314267046, 1=1546314267048}
thread 0, refreshTimes is 1005
thread 1, refreshTimes is 1004
monitor thread start..., aliveThreadRefreshTimeMap:{0=1546314267046, 1=1546314267048}
thread 0, refreshTimes is 2004
thread 1, refreshTimes is 2002
monitor thread start..., aliveThreadRefreshTimeMap:{0=1546314267046, 1=1546314267048}
thread 0, refreshTimes is 3006
alive thread 0: is 3006ms to refresh, will restart
canceled failed with thread id : 0 the thread may already stopped
thread 1, refreshTimes is 3004
alive thread 1: is 3004ms to refresh, will restart
processor--thread-2sleep interrupted
restart success, thread id is:3
processor--thread-3 start sleep 30 s....
monitor thread start..., aliveThreadRefreshTimeMap:{3=1546314270053}
thread 3, refreshTimes is 999
monitor thread start..., aliveThreadRefreshTimeMap:{3=1546314270053}
thread 3, refreshTimes is 1996
monitor thread start..., aliveThreadRefreshTimeMap:{3=1546314270053}
thread 3, refreshTimes is 2998
alive thread 3: is 2998ms to refresh, will restart
processor--thread-3sleep interrupted
restart success, thread id is:4
processor--thread-4 start sleep 30 s....
monitor thread start..., aliveThreadRefreshTimeMap:{4=1546314273052}
thread 4, refreshTimes is 1000
monitor thread start..., aliveThreadRefreshTimeMap:{4=1546314273052}
thread 4, refreshTimes is 2000
monitor thread start..., aliveThreadRefreshTimeMap:{4=1546314273052}
thread 4, refreshTimes is 2999
alive thread 4: is 2999ms to refresh, will restart
processor--thread-4sleep interrupted
restart success, thread id is:5
processor--thread-5 start sleep 30 s....
monitor thread start..., aliveThreadRefreshTimeMap:{5=1546314276051}
thread 5, refreshTimes is 999
monitor thread start..., aliveThreadRefreshTimeMap:{5=1546314276051}
thread 5, refreshTimes is 2000
monitor thread start..., aliveThreadRefreshTimeMap:{5=1546314276051}
thread 5, refreshTimes is 3000
alive thread 5: is 3000ms to refresh, will restart
processor--thread-5sleep interrupted
restart success, thread id is:6
processor--thread-6 start sleep 30 s....
monitor thread start..., aliveThreadRefreshTimeMap:{6=1546314279052}
thread 6, refreshTimes is 999
monitor thread start..., aliveThreadRefreshTimeMap:{6=1546314279052}
thread 6, refreshTimes is 1999
monitor thread start..., aliveThreadRefreshTimeMap:{6=1546314279052}
thread 6, refreshTimes is 3000
alive thread 6: is 3000ms to refresh, will restart
processor--thread-6sleep interrupted
restart success, thread id is:7
processor--thread-7 start sleep 30 s....

符合我们预期,canceled failed with thread id : 0 the thread may already stopped,意味着第一个线程已经跑完了,之后每隔3s,第二个线程都会被中断然后自动重跑。 设计时,有几点需要注意:

  • 并发的处理:因为有多个map,map在submit和monitor线程中都会被操作,所以需要用concurrenthashmap,同理需要用atomicInteger
  • 如果线程没有超时很快就跑完了,理想的做法是,跑完之后直接清理掉这个线程的数据,但是基于我们的逻辑需要封装在内部,没办法侵入外部的业务,所以我们会在内部取消的时进行判断:因为一个线程如果已经完成,那么他是无法被cancel的
  • 另外一点是resubmit时,如果线程池满了,这个时候会进行拒绝策略,我们抓到异常时,默认是不清理数据,直接跳过,这样在下一秒monitorThread运行时,它还是会被遍历到进行重试(这个时候iscanceled方法返回的是true,因为它之前已经被取消过一次了,这也是用isCanceled判断而不用Canceled判断的原因)。这里理想的做法是根据不同的拒绝策略进行定制化的处理。大家可以自行实现
  • scheduleAtFixedRate这个东西可能会有潜在的坑,如果某一次执行过程中抛出了异常,那么它就不会再执行了,所以我们在代码中会尽量抓到这样的异常自己进行处理。

结论

对于实际业务来说,定制化线程池只是一种通过程序层面解决问题的思路,它也有一些缺点,例如会带来额外的系统资源的消耗;内部如果通过scheduleAtFixedRate实现的话会有潜在的性能问题,实际上还有很多其它基于业务的方案。我们应该选择适合自己业务的解决方式。

比如我们有一个订单处理系统,每一个笔订单收单后都需要去各个其它服务拿数据,组成一笔有效的订单,甚至我们自己也需要根据不同的数据进行一些复杂的运算。那么如果我们如果不使用这种线程池,完全可以借助其它中间件完成,比如我发现rpc超时时,可以将订单信息放入redis,信息里有自己配置的下次订单执行时间,再启一个线程从redis里拉订单进行补偿。但引入redis带来了系统额外的复杂,但相对于存于内存中的信息,redis对于订单数据的可靠性反而更强。换句话说,你机器挂了,内存中的数据就没有了,但如果用redis,结合业务进行控制,反而可以达到更好的效果。