速读Java线程池

950 阅读8分钟

一、前言

线程池是开发中绕不开的一个知识点。
对于移动开发而言,网络框架、图片加载、AsyncTask、RxJava, 都和线程池有关。
正因为线程池应用如此广泛,所以也成了面试的高频考点。

我们今天就来讲讲线程池的基本原理和周边知识。
先从线程的生命周期开始。

二、线程生命周期

线程是程序执行流的最小单元。
Java线程可分为五个阶段:

  • 新建(New): 创建Thread对象,并且未调用start();
  • 就绪(Runnable): 调用start()之后, 等待操作系统调度;
  • 运行(Running): 获取CPU时间分片,执行 run()方法中的代码;
  • 阻塞(Blocked): 线程让出CPU,进入等待(就绪);
  • 终止(Terminated): 自然退出或者被终止。

线程的创建和销毁代价较高,当有大量的任务时,可复用线程,以提高执行任务的时间占比。
如上图,不断地 Runnable->Runing->Blocked->Runnable, 就可避免过多的线程创建和销毁。
此外,线程的上下文切换也是开销比较大的,若要使用线程池,需注意设置合理的参数,控制线程并发。

三、ThreadPoolExecutor

JDK提供了一个很好用的线程池的封装:ThreadPoolExecutor

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

corePoolSize:核心线程大小
maximumPoolSize:线程池最大容量(需大于等于corePoolSize,否则会抛异常)
keepAliveTime:线程执行任务结束之后的存活时间
unit:时间单位
workQueue:任务队列
threadFactory:线程工厂
handler:拒绝策略

线程池中有两个任务容器:

private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;

前者用于存储Worker,后者用于缓冲任务(Runnable)。
下面是execute方法的简要代码:

    public void execute(Runnable command) {
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
        }
        // 若workQueue已满,offer会返回false
        if (isRunning(c) && workQueue.offer(command)) {
            // ...
        } else if (!addWorker(command, false))
            reject(command);
    }

    private boolean addWorker(Runnable firstTask, boolean core) {
        int wc = workerCountOf(c);
        if (wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
        Worker w = new Worker(firstTask);
        final Thread t = w.thread;
        workers.add(w);
        t.start();
    }

一个任务到来,假设此时容器workers中Worker数的数量为c,则

  • 1、当c < corePoolSize时,创建Worker来执行这个任务,并放入workers
  • c >= corePoolSize时,
    • 2、若workQueue未满,则将任务放入workQueue
    • workQueue已满,
      • 3、若c < maximumPoolSize,创建Worker来执行这个任务,并放入workers
      • 4、若c >= maximumPoolSize, 执行拒绝策略。

很多人在讲线程池的时候,干脆把workers说成“线程池”,将Worker和线程混为一谈;
不过这也无妨,能帮助理解就好,就像看到一杯水,说“这是水”一样,很少人会说这是“杯子装着水”。

Worker和线程,好比汽车和引擎:汽车装着引擎,汽车行驶,其实是引擎在做功。
Worker本身实现了Runnable,然后有一个Thread和Runnable的成员;
构造函数中,将自身(this)委托给自己的成员thread
thread.start(), Worker的run()函数被回调,从而开启 “执行任务-获取任务”的轮回。

    private final class Worker implements Runnable{
        final Thread thread;
        Runnable firstTask;
        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        public void run() {
            runWorker(this);
        }
    }

    final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        while (task != null || (task = getTask()) != null) {
            task.run();
        }
    }

    private Runnable getTask() {
        for (;;) {
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
        }
    }

当线程执行完任务(task.run()结束),会尝试去workQueue取下一个任务,
如果workQueue已经清空,则线程进入阻塞态:workQueue是阻塞队列,如果取不到元素会block当前线程。
此时,allowCoreThreadTimeOuttrue, 或者 n > corePoolSize,workQueue等待keepAliveTime的时间,
如果时间到了还没有任务进来, 则退出循环, 线程销毁;
否则,一直等待,直到新的任务到来(或者线程池关闭)。
这就是线程池可以保留corePoolSize个线程存活的原理。

从线程的角度,要么执行任务,要么阻塞等待,或者销毁;
从任务的角度,要么马上被执行,要么进入队列等待被执行,或者被拒绝执行。
上图第2步,任务进入workQueue, 如果队列为空且有空闲的Worker的话,可马上得到执行。

关于workQueue,常用的有两个队列:

  • LinkedBlockingQueue(capacity):
    传入capacity(大于0), 则LinkedBlockingQueue的容量为capacity;
    如果不传,默认为Integer.MAX_VALUE,相当于无限容量(不考虑内存因素),多少元素都装不满。
  • SynchronousQueue 除非另一个线程试图移除获取元素,否则不能添加元素。

四、 ExecutorService

为了方便使用,JDK还封装了一些常用的ExecutorService:

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}
类型最大并发适用场景
newFixedThreadPoolnThreads计算密集型任务
newSingleThreadExecutor1串行执行的任务
newCachedThreadPoolInteger.MAX_VALUEIO密集型任务
newScheduledThreadPoolInteger.MAX_VALUE定时任务,周期任务

newSingleThreadExecutor 其实是 newFixedThreadPool的特例 (nThreads=1),
写日志等任务,比较适合串行执行,一者不会占用太多资源,二者为保证日志有序与完整,同一时间一个线程写入即可。

众多方法中,newCachedThreadPool() 是比较特别的,
1、corePoolSize = 0,
2、maximumPoolSize = Integer.MAX_VALUE,
3、workQueue 为 SynchronousQueue。

结合上一节的分析:
当一个任务提交过来,由于corePoolSize = 0,任务会尝试放入workQueue;
如果没有线程在尝试从workQueue获取任务,offer()会返回false,然后会创建线程执行任务;
如果有空闲线程在等待任务,任务可以放进workQueue,但是放进去后马上就被等待任务的线程取走执行了。
总的来说,就是有空闲线程则交给空闲线程执行,没有则创建线程执行;
SynchronousQueue类型workQueue并不保存任务,只是一个传递者。
所以,最终效果为:所有任务立即调度,无容量限制,无并发限制。

这样的特点比较适合网络请求任务。
OkHttp的异步请求所用线程池与此类似(除了ThreadFactory ,其他参数一模一样)。

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

五、 任务并发的估算

一台设备上,给定一批任务,要想最快时间完成所有任务,并发量应该如何控制?
并发量太小,CPU利用率不高;
并发量太大,CPU 满负荷,但是花在线程切换的时间增加,用于执行任务的时间反而减少。

一些文章提到如下估算公式:

M:并发数;
C:任务占用CPU的时间;
I:等待IO完成的时间(为简化讨论,且只考虑IO);
N:CPU核心数。

代入特定参数验证这条公式:
1、比方说 I 接近于0,则M≈N,一个线程对应一个CPU,刚好满负荷且较少线程切换;
2、假如 I=C,则M = 2N,两个线程对应一个CPU,每个线程一半时间在等待IO,一半时间在计算,也是刚好。

遗憾的是,对于APP而言这条公式并不适用:

  • 任务占用CPU时间和IO时间无法估算
    APP上的异步任务通常是碎片化的,而不同的任务性质不一样,有的计算耗时多,有的IO耗时多;
    然后同样是IO任务,比方说网络请求,IO时间也是不可估计的(受服务器和网速影响)。
  • 可用CPU核心可能会变化
    有的设备可能会考虑省电或者热量控制而关闭一些核心;
    大家经常吐槽的“一核有难,九核围观”映射的就是这种现象。

虽然该公式不能直接套用来求解最大并发,但仍有一些指导意义:
IO等待时间较多,则需要高的并发,来达到高的吞吐率;
CPU计算部分较多,则需要降低并发,来提高CPU的利用率。

换言之,就是:
计算密集型任务时控制并发小一点;
IO密集型任务时控制并发大一点。

问题来了,小一点是多小,大一点又是多大呢?
说实话这个只能凭经验了,跟“多吃水果”,“加盐少许”一样,看实际情况而定。

比如RxJava就提供了Schedulers.computation()Schedulers.io()
前者默认情况下为最大并发为CPU核心数,后者最大并发为Integer.MAX_VALUE(相当于不限制并发)。
可能是作者也不知道多少才合适,所以干脆就不限制了。
这样其实很危险的,JVM对进程有最大线程数限制,超过则会抛OutOfMemoryError
Kotlin协程也有Dispatchers.DefaultDispatchers.IO, Dispatchers.IO就限制了最多64个任务并发。

开发过程中,如果使用RxJava或者协程,就不用考虑线程池corePoolSize多少了,简单地判断任务是否涉及IO即可。

六、总结

回顾文章的内容,大概有这些点:

  • 介绍了线程的生命周期;
  • 从线程池的参数入手,分析这些参数是如何影响线程池的运作;
  • 列举常用的ExecutorService,介绍其各自特点和适用场景;
  • 对并发估算的一些理解。

文章没有对Java线程池做太过深入的探讨,而是从使用的角度讲述基本原理和周边知识;
第二节有结合关键代码作简要分析,也是点到为止,目的在于加深对线程池相关参数的理解,
以便在平时使用线程池的时候合理斟酌,在阅读涉及线程池的开源代码时也能“知其所以然”。