Java 线程池(二)

260 阅读14分钟

简介

在上篇 Java 线程池(一) 我们介绍了线程池中一些的重要参数和具体含义,这篇我们看一看在 Java 中是如何去实现线程池的,要想用好线程池,只知其然是远远不够的,我们需要深入实现源码去了解线程池的具体实现细节,这样才能更好的使用到我们的工作中,当出现问题时能快速找到问题根源所在。

线程池如何处理提交的任务

我们向线程池提交任务有两种方式,分别是通过 submit 方法提交和通过 execute 方法提交,这两种方式的区别为 execute 只能提交 Runnable 类型的任务并且没有返回值,而 submit 既能提交 Runnable 类型的任务也能提交 Callable(JDK 1.5+)类型的任务并且会有一个类型 Future 的返回值,我们知道 Runnable 是没有返回值的,所以只有当提交 Callable 类型的任务时才会有返回值,而提交 Runnable 的返回值是 nullexecute 执行任务时,如果此时遇到异常会直接抛出,而 submit 不会直接抛出,只有在使用 Futureget 方法获取任务的返回结果时,才会抛出异常。 通过查看 ThreadPoolExecutor 的源码我们发现,其 submit 方法是继承自其抽象父类 AbstractExecutorService 而来的,有三个重载的方法,分别可以提交 Runnable 类型和 Callable 类型的任务。无论是哪个 submit 方法最终还是调用了 execute 方法来实现的。方法源码如下:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

首先对提交的任务进行判非空指针后,三个方法都是调用 newTaskFor 方法把任务统一封装成 RunnableFuture 对象,然后把封装好的对象作为 execute 方法的入参去执行,而此时 execute 方法还未实现,这个方法是在 AbstractExecutorService 的继承类 ThreadPoolExecutor 中实现。下面看看 newTaskFor 方法是如何封装我们提交的任务的,两个重载方法的源码如下:

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

那么这个 FutureTask 是个什么东东呢,进入其源码发现它实现了 RunnableFuture 接口,而 RunnableFuture 接口的作用正如其名,它是 RunnableFuture 的结合体,表示一个能异步返回结果的线程。我们知道 Runnable 是不能返回结果的,所以上面第一个 newTaskFor(Runnable runnable, T value) 方法的第二个参数 value 的作用就是指定返回结果。其实最后也是通过 RunnableAdapterRunnablevalue 封装成 Callable 的。下面我们看看 execute 方法是怎么处理的,方法源码如下:

thread-pool-3.png

第 ① 步 获取当前的 ctl 值,在上篇 Java 线程池(一) 中说过,变量 ctl 存储了线程池的工作状态 runState 和线程池中正在运行的线程数 workerCount第 ② 步 通过 workerCountOf 方法取出线程池中当前正在运行的线程数( ctl 低 29 位的值),如果线程池当前工作线程数小于核心线程数 corePoolSize,则进行第 ③ 步。 第 ③ 步 通过 addWorker 方法新建一个线程加到线程池中,addWorker 方法的第二个参数如果为 true 则限制添加线程的数量是根据 corePoolSize 来判断,反之则根据 maximumPoolSize 来判断,并把任务添加到该线程中。 第 ④ 步 如果添加失败,则重新获取 ctl 的值。 第 ⑤ 步 如果当前线程池的状态是运行状态(state < SHUTDOWN)并且把任务成功添加到队列中。 第 ⑥ 步 重新获取 ctl 的值,再次判断线程池的运行状态,如果不是运行状态,要从队列中移除任务,因为到这一步了,意味着之前已经把任务成功添加到队列中了,所以需要从队列移除。移除成功后调用拒绝策略对任务进行处理,整个 execute 方法结束(PS:为什么不在入队列之前就先判断线程池的状态呢?因为判断一个线程池工作处于运行状态到执行入队列操作这段时间,线程池可能已经被其它线程关闭了,所以提前判断其实毫无意义)。 第 ⑦ 步 通过 workerCountOf 方法取出线程池中当前正在运行的线程数( ctl 低 29 位的值),如果是 0 则执行 addWorker(null, false) 方法,第一个参数传 null 表示只是在线程池中创建一个线程出来,但是没有立即启动,因为我们创建线程池时可能要求核心线程数量为 0。第二个参数为 false 表示限制添加线程时根据 maximumPoolSize 来判断,如果当前线程池中正在运行线程数量大于 0 ,则直接返回,因为在上面第 ⑤ 步已经把任务成功添加到队列 workQueue 中,它会在将来的某个时刻执行到。 第 ⑧ 步 如果执行到这个地方,只有两种情况,一种是线程池的状态已经不是运行状态了,另一种是线程池是运行状态,但是此时线程池的工作线程数大于等于核心线程数(workerCount >= corePoolSize)并且队列 workQueue 已满。这时会再次调用 addWorker 方法,第二个参数传的 false,意味着限制添加线程的数量是根据 maximumPoolSize 来判断的,如果失败则调用拒绝策略对任务进行处理,整个 execute 方法结束。 上面的 execute 方法中多次调用 addWorker,该方法的主要作用就是创建一个线程来执行任务。addWorker 的方法签名如下:

addWorker(Runnable firstTask, boolean core)

第一个参数 firstTask 如果不为 null,则创建的线程首先执行 firstTask 任务,然后才会从队列中获取任务,否则会直接从队列中获取任务。第二个参数如果为 true,则表示限制添加线程时根据 corePoolSize 来判断,否则根据maximumPoolSize 来判断。我们看看 addWorker 方法的源码,方法源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

方法首先获取线程池 ctl 属性的值,该属性包含了线程池的运行状态工作线程数,通过 runStateOf 获取线程池的运行状态,然后执行下面这个比较复杂的条件判断

thread-pool-4.png

第 ① 个条件表示此时线程池已经不再接受新任务了,接下来的 ②、③、④ 三个判断条件只要有一个不满足,那么方法就会返回 false,方法结束。第 ② 个条件表示线程池为关闭状态,处于关闭状态的线程池不会处理新提交的任务,但会处理完已处理的任务,第 ③ 个条件为 firstTask 为 null,第 ④ 个条件为队列不为空。我们看看如果线程池此时为关闭状态的情况,这种情况线程池不会接受新提交的任务,所以此时如果传入的 firstTask 不为 null,则会直接返回 false;然后如果 firstTasknull,并且队列 workQueue 为空,此时也会返回 false,因为此时队列里已经没有任务了,那么也不需要再添加线程了,然后接下来会进入一个循环。

thread-pool-5.png

第 ① 步 调用 workerCountOf 方法获取当前线程池的工作线程数 第 ② 步 如果当前线程池的工作数大于 CAPACITY 也就是 ctl 的低 29 位的最大值,则返回 false,如果不大于 CAPACITY,然后根据 core (该方法的第二个参数)来判断是和 corePoolSize 比较还是和 maximumPoolSize 比较,如果比这个值大则返回 false第 ③ 步 使用 ctlcompareAndSet 原子方法尝试把工作线程数 workerCount + 1,如果增加成功,退出第一层循环。 第 ④ 步 如果增加线程池工作线程数失败,则重新获取 ctl 的值。 第 ⑤ 步 调用 runStateOf 获取线程池的状态,如果不等于方法前面获取的 rs,说明线程池的状态已经改变了,回到第一层循环继续执行。 接下来会启动线程执行任务,源码如下:

thread-pool-6.png

第 ① 步 根据 firstTask 创建 Worker 对象,每一个 Worker 对象都会创建一个线程,然后会使用重入锁 ReentrantLock 进行加锁操作。 第 ② 步 调用 runStateOf 获取线程池的状态,然后进行一个条件判断,第一个 rs < SHUTDOWN 表示线程池是运行状态。如果线程池是运行状态或者线程池是关闭状态并且 firstTasknull,那么就往线程池中加入线程(因为当线程池是 SHUTDOWN 状态时不会再向线程池添加新的任务,但会执行队列 workQueue 中的任务)。这里的 workers 是一个 HashSet,所以其 add 方法不是线程安全的,所以需要加锁操作。然后修改线程池中出现过的最大线程数量 largestPoolSize 记录和把是否添加成功标记 workerAddedtrue。如果 workerAddedtrue 那么会启动线程并把线程是否启动标记 workerStarted 改为 true第 ③ 步 根据线程是否启动 workerStarted 标记来判断是否需要进行失败的操作。包含从 workers 移除当前的 worker、线程池的工作线程数减 1、尝试终端线程池。

线程池中线程是如何执行的

线程池的线程执行是调用 Workerthread 属性的 start 方法,而 threadrun 方法实际上调用了 Worker 类的 runWorker 方法,所以我们直接来看看 runWorker 方法的源码:

thread-pool-7.png

第 ① 步 获取第一个任务,while 循环不断地通过 getTask 方法从队列中获取任务。 第 ② 步 这个判断条件目的是要保证如果线程池正在停止,要保证当前线程是中断状态,如果是的话,要保证当前线程不是终端状态。 第 ③ 步 方法 beforeExecute 方法在类 ThreadPoolExecutor 中没有做任何操作,是留给子类去自定义在线程执行之前添加操作的方法。 第 ④ 步 执行 task.run() 执行任务(PS:这里为什么是调用 run 方法而不是调用 start 方法呢?我们知道当调用了 start 方法后操作系统才会给我们创建一个独立的线程来运行,而调用 run 方法只是一个普通的方法调用,而线程池正好就是需要它是一个普通的方法才能进行任务的调度。我们可以想象一下,假如这里是调用的 Runnable 的 start 方法,那么会是什么结果呢。如果我们往一个核心线程数、最大线程数为 3 的线程池里丢了 500 个任务,那么它会额外的创建 500 个线程,同时每个任务都是异步执行的,结果一下子就执行完毕了,根本无法对任务进行调度。从而没法做到由这 3 个 Worker 线程来调度这 1000 个任务,而只有当做一个普通的 run 方法调用时才能满足线程池的这个要求)。 第 ⑤ 步 方法 afterExecute 方法在类 ThreadPoolExecutor 中没有做任何操作,是留给子类去自定义在线程执行之后添加操作的方法。completedAbruptly 变量是用来表示在执行任务过程中是否出现了异常,processWorkerExit 方法中会对该变量的值进行判断。 接下来我们看看 getTask 方法是如何从队列中获取任务的,方法源码如下:

thread-pool-8.png

第 ① 步 如果线程池不是运行状态,则判断线程池是否正在停止或者当前队列为空,如果条件满足将线程池的工作线程数减一并返回 null。因为如果当前线程池状态的值是 SHUTDOWN 或以上时,就不允许再向队列中添加任务了。 第 ② 步 这里的 timed 变量用来标记是否需要线程进行超时控制,allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时。wc > corePoolSize 表示当前线程池中的工作线程数量大于核心线程数量,对于超过核心线程数量的这些线程,需要进行超时控制。 第 ③ 步 第一个判断 wc > maximumPoolSize 如果成立是因为可能在此方法执行阶段同时执行了线程池的 setMaximumPoolSize 方法;第二个判断 timed && timedOut 如果成立表示当前操作需要进行超时控制,并且上次从队列中获取任务发生了超时(timeOut 变量的值表示上次从阻塞队列中取任务时是否超时);第三个判断 wc > 1 || workQueue.isEmpty() 如果线程池中工作线程数量大于 1,或者队列是空的,那么尝试将 workerCount 减一,如果减一失败,则返回重试。如果 wc == 1 时,也就说明当前线程是线程池中唯一的一个线程了。 第 ④ 步 根据 timed 来判断,如果为 true,则通过阻塞队列的 poll 方法进行超时控制,如果在 keepAliveTime 时间内没有获取到任务,则返回 null,否则通过 take 方法,如果这时队列为空,则 take 方法会阻塞直到队列不为空。如果 r == null,说明已经超时,timedOut 设置为 true第 ⑤ 步 如果获取任务时当前线程发生了中断,则设置 timedOutfalse 并重新循环重试。

关闭线程池

线程池的关闭一般都是使用 shutdown 方法和 shutdownNow 方法,两者的区别是前面的 shutdown 方法不会执行新的任务,但是会执行完当前正在执行的任务,而后面的 shutdownNow 方法会立即停止当前线程池,不管当前是否有线程在执行。一般都是使用 shutdown 方法来停止线程池,其方法源码如下:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

advanceRunState(SHUTDOWN) 方法的作用是通过 CAS 原子操作将线程池的状态更改为关闭状态。interruptIdleWorkers 方法是对空闲的线程进行中断,其实是调用重载带参数的函数 interruptIdleWorkers(false)。然后 onShutdown 方法和上文提到的 beforeExecuteafterExecute 方法一样,在类 ThreadPoolExecutor 是空实现,也是个钩子函数。我们看看 interruptIdleWorkers 的实现源码:

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

先进行加锁操作,然后遍历 workers 容器,也就是遍历线程池中的线程,对每个线程进行 tryLock 操作,如果成功说明线程空闲,则设置其中断标志位。而线程是否响应中断则交给我们定义任务的人来决定。

总结

本文比较详细的分析了线程池任务的提交、线程的执行、线程池的关闭的工作流程。通过学习线程池相关的源码后,看到了在其内部用运用了很多多线程的解决方法,有如下几个方式:

  1. 通过定义重入锁 ReentrantLock 变量 mainLock 来解决并发多线程的安全问题
  2. 利用等待机制来实现线程之间的通讯问题 除了内置的功能外,ThreadPoolExecutor 也向外提供了两个接口供我们自己扩展满足我们需求的线程池,这两个接口分别是:beforeExecute 任务执行前执行的方法,afterExecute 任务执行结束后执行的方法。