个人从ThreadPoolExecutor开始的线程池笔记

1,089 阅读23分钟

部分内容摘自《Java并发编程的艺术》

池化技术

池化技术的主要目的在应用启动时预先保存一些资源放到池中管理,当需要获取资源时可以直接从池中获取,使用完毕后重新放回池中,从而减少资源创建与销毁的消耗。常见的池化技术实现有线程池、连接池、内存池等,线程池中的资源便是线程,连接池中的资源便是连接,内存池中的资源便是内存。

线程池的优点

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程任务接口 Runnable && Callable

@FunctionalInterface
public interface Runnable {
    /**
     * 使用实现接口Runnable的对象创建线程时,启动线程会在单独执行的线程中调用对象的run方法。
     */
    public abstract void run();
}

@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,无法计算则抛出异常
     *
     * @return 计算结果 
     * @throws Exception 无法计算则抛出异常
     */
    V call() throws Exception;
}

Callable接口与Runnable相似,两者实例均可被线程异步执行。但是Runnable不会返回结果,也不能引发已检查的异常,直接把异常打印,而Callable会把检查到的异常捕获到返回的Future中,Callable执行结束后可通过Future.get()获取结果或执行过程中抛出的异常。例:

ThreadPoolExecutor executorService = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
Callable callable = () -> 1 / 0;
Runnable runnable = () -> System.out.println(1 / 0);
Future future =  executorService.submit(callable);
try {
    System.out.println("future get:" + future.get());
}catch (Exception e){
    System.err.println("future get exception");
    e.printStackTrace();
}
executorService.execute(runnable);
executorService.shutdown();
System.out.println("shutdown success");

控制台将打印"future get exception"、2个异常及"shutdown success",由于Future.get()会阻塞主线程抛出异常,所以需进行try-catch处理避免线程池无法正常关闭,而execute()的异常则不会被检测到所以不捕捉线程池也能正常关闭。 Executors.callable()可以使Runnable转换为Callable

各种线程池Executor相关类的关系与简介

ThreadPool.png

  • Executor接口:提供任务提交与任务执行分离的方法execute()
  • ExecutorService接口:Executor的扩展接口,提供任务终止、跟踪的抽象方法
  • AbstractExecutorService抽象类:提供ExecutorService的默认实现
  • ScheduledExecutorService接口:ExecutorService的扩展接口,提供任务延迟、定时执行的抽象方法
  • ThreadPoolExecutor类(核心):ExecutorService的完整实现类
  • ThreadPoolExecutor.Worker类:AbstractQueuedSynchronizer扩展类,主要维护运行任务的线程的中断控制状态以及其它次要的记录
  • ScheduledThreadPoolExecutor类:继承了ThreadPoolExecutor并实现了ScheduledExecutorService,提供了任务延迟、定时执行的实现

ThreadPoolExecutor类源码解析

属性

corePoolSize:核心线程数,即使没有任务线程池中依旧保留的线程数,可将allowCoreThreadTimeOut设为true使核心线程超过存活时间时释放,即核心线程数可为0 maximumPoolSize:最大线程数,线程池中允许的最大线程数,队列任务满时同时运行的线程数 keepAliveTime:当线程数大于核心数时,多余的空闲线程的存活时间,若存活时间内没有新任务就会被释放 workQueue:任务队列,当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。 threadFactory:线程工厂,负责创建线程 handler:处理策略,当提交的任务过多(线程池与队列中任务数达到上限)时对多出任务执行的策略 ctl:主池控制状态,打包了两个概念字段workerCount与runState,workerCount指线程运行的有效数量,runState指线程的运行状态(运行中、关闭中等),用于位操作获取workerCount与runState

  • runState:提供主要的线程池生命周期控制,并具有以下标记:
    • RUNNING:接受新任务并处理排队的任务
    • SHUTDOWN:不接受新任务,但处理排队的任务
    • STOP:不接受新任务,不处理排队的任务,并中断进行中的任务
    • TIDYING:所有任务都已终止,workerCount为零,转换为TIDYING状态的线程将运行terminated()方法
    • TERMINATED:terminated()调用结束 workers:线程池中的所有工作线程集,仅在持有mainLock时访问 mainLock:用于访问workers及并发控制

全参构造函数

    /**
     * 根据给定的初始参数构造ThreadPoolExecutor实例
     *
     * @param corePoolSize 核心线程数,即使没有任务线程池中依旧保留的线程数
     * @param maximumPoolSize 最大线程数
     * @param keepAliveTime 线程存活时间
     * @param unit keepAliveTime的时间单位
     * @param workQueue 任务队列
     * @param threadFactory 线程工厂,负责创建线程
     * @param handler 任务过多时的处理策略
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ThreadPoolExecutor内部提供了RejectedExecutionHandler的几种实现策略:

  • DiscardOldestPolicy:抛弃未处理的最旧的任务请求
  • AbortPolicy(默认策略):通过抛出RejectedExecutionException拒绝任务
  • CallerRunsPolicy:直接在execute方法的调用线程中运行拒绝任务,若执行器已关闭,任务将被丢弃
  • DiscardPolicy:直接丢弃新任务

添加执行任务execute()

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 获取主池控制状态
        int c = ctl.get();
        // 工作线程数 < corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            // 新建线程执行任务
            if (addWorker(command, true))
                return;
            // 
            c = ctl.get();
        }
        // 线程池正常运行 && 任务入队成功
        if (isRunning(c) && workQueue.offer(command)) {
            // 重新获取避免其它线程改变了ctl的值
            int recheck = ctl.get();
            // 线程池非正常运行则从任务队列中移除任务
            if (! isRunning(recheck) && remove(command))
                // 使用处理策略handler处理该任务
                reject(command);
            // 工作线程为空
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 入队失败,新建非核心工作线程执行任务
        else if (!addWorker(command, false))
            reject(command);
    }
    
    /**
     * 添加执行任务到Worker
     * @param firstTask 
     * @param core 新建的线程是否绑定到核心线程数,true则新建线程属corePoolSize中的线程,false则属maximumPoolSize
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // 用于跳出内循环的循环重试标签
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 检查线程池状态,必要时检查队列是否为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                // 若使用核心线程执行任务且 工作线程数>=核心线程数,则返回false;若使用新建线程执行任务且 工作线程数>=允许的最大线程数,则返回false;
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS自增工作线程数成功则跳出循环标签
                if (compareAndIncrementWorkerCount(c))
                    break retry; 
                c = ctl.get();  // Re-read ctl
                // 重新检测线程池状态,如果与循环一开始的线程池状态不符则跳到标签重新循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 在上锁之前可能会有其它线程对当前新建的worker w进行操作
                mainLock.lock();
                try {
                    // 获取锁的同时重新检查池状态
                    int rs = runStateOf(ctl.get());
                    // 判断线程池状态是否为RUNNING(rs<SHUTDOWN)
                    // 或线程池状态SHUTDOWN状态且新的任务为空
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 判断worker维护的线程是否存活
                        // 由于worker是在当前方法新建的,其线程不应该被启动,其维护线程存活即Thread.start()方法已被其它线程调用,状态不合理
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 工作线程集workers中添加当前新建的worker
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // worker任务添加成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    /**
     * 从任务队列中移除任务
     */
    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }
    
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        /** 当前Worker运行的线程 */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** 线程完成的任务数 */
        volatile long completedTasks;
        
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
        ......
    }

根据以上源码可画出以下添加任务的主要流程图: ThreadPoolExecutor-Flowchart.png

submit()与execute()的区别

  • submit()提交的Runnable任务都会被转换为Callable再执行
  • execute()方法用于提交不需要返回值的任务,所以无法判断任务是否成功被线程池执行
  • submit()方法用于提交需要返回值的任务,线程池会返回一个Future对象,通过该对象可以判断任务是否执行成功,并且可以通过 Future.get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成

部分public方法解析

  • int getActiveCount():返回正在执行任务线程的大概数量(任务和线程的状态在计算过程中可能会动态变化)
  • int getLargestPoolSize():返回池中曾经同时存在的最大线程数
  • long getTaskCount():返回计划执行的任务大概总数
  • long getCompletedTaskCount():返回已完成执行的任务的大概总数
  • int getPoolSize():返回池中的当前线程数
  • boolean isShutdown():判断池状态是否非RUNNING,池状态为SHUTDOWN、STOP、TIDYING、TERMINATED都会返回true
  • void shutdown():启动有序关闭线程池,在该关闭中执行先前提交的任务,但不接受任何新任务,同时将池状态改为SHUTDOWN
  • List<Runnable> shutdownNow():将池状态更改为STOP,尝试停止所有正在执行的任务,暂停正在等待的任务的处理,并返回正在等待执行的任务的列表,队列中的任务将被清空
  • boolean isTerminating():池状态处于SHUTDOWN则返回true,表示执行程序在shutdown()shutdownNow()之后正在终止线程池
  • boolean isTerminated():判断池状态是否为TERMINATED

源码摘录:

    public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 若线程池状态为TIDYING、TERMINATED,则返回0
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
        } finally {
            mainLock.unlock();
        }
    }
    
    public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                // 上锁即线程正在执行任务
                if (w.isLocked())
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }
    
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检测是否有所有线程权限
            checkShutdownAccess();
            // CAS更改池状态
            advanceRunState(SHUTDOWN);
            // 中断正在等待任务的线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
    
    // 终端所有workers的线程
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    
    /**
     * @param onlyOne 是否只终端一个worker的线程
     * 中断workers线程
     */ 
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    ......

以上代码可以看出ThreadPoolExecutor每次对线程工作集合workers进行操作都会使用可重入锁mainLock进行上锁解锁。

使用示例

public class ThreadPoolTest {
    public static void main(String[] args) {
        // 创建线程池工厂设置线程名称格式,execute()任务异常处理器
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("thread-%s")
                // execute()执行过程中出现异常则打印异常线程及异常信息
                .setUncaughtExceptionHandler((thread, throwable) -> {
                    System.err.println("exception thread id:" + thread.getId() + ", thread name:" + thread.getName());
                    throwable.printStackTrace();
                })
                .build();
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(5, 10, 1,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), threadFactory);
        // 接收Callable结果
        List<Future<String>> futures = new ArrayList<>();
        try {
            // 控制台可能会打印"线程池已满"
            IntStream.range(0, 24)
                    .forEach(i -> executorService.execute(() -> System.out.println(Thread.currentThread().getName())));
            // 提交Callable任务
            // IntStream.range(0, 24)
            //         .forEach(i -> futures.add(executorService.submit(() -> Thread.currentThread().getName())));
        } catch (RejectedExecutionException e) {
            System.err.println("线程池已满");
        } finally {
            // 关闭线程池,避免处理策略抛出的异常导致线程池关闭失败
            executorService.shutdown();
        }
        // 线程池状态若非TERMINATED则自旋阻塞主线程
        while (!executorService.isTerminated()){

        }
        for (Future<String> fut : futures) {
            try {
                System.out.println(new Date() + "::" + fut.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

上例中创建的线程池核心线程数=5,队列容量=10,池中最大线程数=10,当池中任务队列容量已满且线程数达最大值,即池中任务数>20时就会使用默认的AbortPolicy策略抛出RejectedExecutionException以拒接后续的Runnable任务。由于任务处理效率与任务溢出量(4)不大的原因,以上示例并不一定会打印"线程池已满",如需确认可在Runnable任务中添加Thread.sleep(100)。 由于处理策略抛出的异常会影响后续代码的执行,所以需在finally块中进行线程池的关闭以保证线程池中的资源得到释放。

Executors可创建的线程池

Executors类提供了可创建以下JUC包下的线程池:

  • FixedThreadPool:可重用固定线程数的线程池
  • CachedThreadPool:缓存线程池,通过延长线程存活时间缓存线程
  • ScheduledThreadPool:任务可延迟或定期执行的线程池
  • WorkStealingPool:任务窃取线程池,1.8版本新增的内部池为ForkJoinPool的创建方法

以上线程池并没有相应的同名类,只是一些特定功能线程池的名称,基于JUC原有的线程池类构建时添加一些参数的限制、队列的选择从而构建出特定功能的线程池。

FixedThreadPool

FixedThreadPool中的可重用指的是线程的重用,池中线程数是固定不变的,且任务队列可看作边界(Integer.MAX_VALUE)的。该线程池创建涉及的源码如下:

public class Executors {
    ......
    
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    ......
}

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        
    ......
    
     public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    ......
}    

newFixedThreadPool()方法创建的ThreadPoolExecutor参数解读:

  • corePoolSize=maximumPoolSize:线程池中只有核心线程,即使核心线程数已满与队列已满也不会创建新线程执行任务
  • keepAliveTime=0:固定线程数且线程皆为核心线程的线程池设置存活时间没有意义
  • new LinkedBlockingQueue<Runnable>():队列无界,意味着队列永远不会被任务填满,所以也不会因为处理不过来而执行任务拒接策略,核心线程会不断从任务队列中获取并执行任务。除此以外,当任务堆积过多时可能会导致内存OOM,抛出OutOfMemoryError

设置VM options为-Xms64m -Xmx64m,以下程序运行一会后将抛出OOM(同样可适用于CachedThreadPool)。

ExecutorService executorService = Executors.newFixedThreadPool(1);
LinkedList<String> hashList = new LinkedList<>();
Runnable task = () -> {
    byte[] KB = new byte[1024];
    hashList.add(Thread.currentThread().getId() + ":" + Thread.currentThread().getName());
    try {
        // 阻塞,使队列任务剧增
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};
boolean flag = true;
while (flag) {
    executorService.execute(task);
}
// OOM无法捕获,所以没有finally shutdown线程池

CachedThreadPool

缓存线程池的实现是通过延长线程池中的线程存活时间以达到线程缓存的目的,该线程池创建涉及的源码如下:

public class Executors {
    ......
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    ......
}

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
 
    ......
 
    public SynchronousQueue() {
        this(false);
    }
    
    // 是否公平队列,true则公平,先进先出,false则无需
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
    
    ......
}

newCachedThreadPool()方法创建的ThreadPoolExecutor参数解读:

  • corePoolSize=0:核心线程数为0,即任务都是直接放入队列,执行队列任务的线程都是非核心线程
  • maximumPoolSize=Integer.MAX_VALUE:最大线程数可看作无界,意味着有多少任务就创建多少线程
  • keepAliveTime=60s:线程存活时间为60s,延迟了线程的生命周期以减少没有核心线程带来的负面影响,可提高短期内执行许多异步任务的程序性能
  • workQueue=SynchronousQueue(false):任务队列是非公平的阻塞队列,对列的每个插入操作都必须等待另一个线程进行相应的删除操作

SynchronousQueue是一个同步阻塞队列,每次插入操作必须等待另一个线程进行相应的删除操作,否则将被阻塞,反之亦然(没有元素取线程将被阻塞直到有元素被插入)。同步队列没有任何内部容量,队列的头是第一个排队的插入线程试图添加到队列中的元素,如果没有这样的线程,poll()将返回null。CacheThreadPool中的执行任务流程图如下:

CacheThreadPool.jpg

虽然线程数的无限制可应对突如其来的高并发,但也存在着承载上限,当创建的线程数过多导致内存不足时,会导致内存OOM,从而程序抛出OutOfMemoryError

ScheduledThreadPool

Executors创建的ScheduledThreadPool对应的类为ScheduledThreadPoolExecutor,创建涉及的源码如下:

public class Executors {
    ......
    
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    ......
}

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    
    ......
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        // 调用ThreadPoolExecutor的构造方法
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
}

ScheduledThreadPoolExecutor构造参数解读:

  • maximumPoolSize=Integer.MAX_VALUE:最大线程数可看作无界,意味着有多少任务就创建多少线程
  • keepAliveTime=0s:固定线程数且线程皆为核心线程的线程池设置存活时间没有意义
  • workQueue=DelayedWorkQueue():延迟队列,ScheduledThreadPoolExecutor所有的构造方法都是使用该队列,Runnable任务将被强转为可延长执行的RunnableScheduledFuture

DelayedWorkQueue是基于类似于DelayQueue和PriorityQueue基于堆的数据结构,只是每个ScheduledFutureTask也将其索引记录到堆数组中,由上源码可看出ScheduledThreadPool线程池中的任务队列是根据执行时间排序的。为了让关系更清晰,绘制了以下相关类图:

ScheduledThreadPoolExecutor.png

由于ScheduledThreadPoolExecutor实现了ScheduledThreadPoolExecutor,所以相比其它线程池多出了以下几个任务调度方法:

  • ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) 指定延迟后执行任务,返回的future get()为空
  • ScheduledFuture<?> schedule(Callable command, long delay, TimeUnit unit) 同上,区别在于返回的future get()Callable结果
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 指定延迟后根据固定的时间间隔周期性执行任务,无论上一个周期任务是否执行完毕都不会影响当前周期任务的执行,即上一个任务与当前任务的开始时间间隔始终为delay unit
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 指定延迟后根据任务的完成时间延迟指定delay时间周期性执行任务,即上一个任务的结束时间与当前任务的开始时间间隔始终为delay unit

scheduleWithFixedDelay()scheduleWithFixedDelay()方法都是将Runnable转换为Callable并封装到ScheduledFutureTask中,然后再丢进延迟队列,具体的相关的核心源码如下:

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
        
    ......
    
    // (0) ScheduledThreadPoolExecutor任务调度之源
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            // (1) 新建调度任务实例,并将Runnable转Callable进行封装
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        // 提交调度任务
        delayedExecute(t);
        return t;
    }
    
    // 延迟执行任务
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            // (2) 添加任务到延迟队列中
            super.getQueue().add(task);
            // (3) 判断相关状态是否适合执行新任务 判断线程池是状态是否为SHUTDOWN
            if (isShutdown() &&
                // 当前运行状态能否运行插入的任务,如判断shutdown后延时任务是否被取消
                !canRunInCurrentRunState(task.isPeriodic()) &&
                // 当前状态不能运行,则将任务从队列中移除
                remove(task))
                // 若任务正在执行,则不取消
                task.cancel(false);
            else
                // 相关状态符合执行要求,预建线程
                ensurePrestart();
        }
    }
    
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        // 创建核心线程
        if (wc < corePoolSize)
            addWorker(null, true);
        // 创建普通线程
        else if (wc == 0)
            addWorker(null, false);
    }
    ......
    
    static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
        // 任务存储队列
        private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        
        ......
        
        // (2) 添加任务
        public boolean add(Runnable e) {
            return offer(e);
        }
        
        // (2) 添加任务
        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            // 将Runnable任务转为RunnableScheduledFuture
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    // 扩容,队列的默认长度为16
                    grow();
                size = i + 1;
                // 任务队列为空,直接添加到下标0
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    // (2.1) 根据任务时间排序将任务插入到队列堆中的合适位置,最接近
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    // (2.2) 当前任务在队列头部,即该任务是即时任务,唤醒线程
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }
        
        // 根据任务时间将任务插入到队列堆中的合适位置
        private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                // 以当前任务数右移3位后对应的队列下标任务进行时间比较
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                // (2.3) 任务时间比较,执行时间越早放得越前
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                // 设置任务所在堆索引,
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }
        
        ......
    }
    
    private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {
        ......
        
        ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);  // 调用父类FutureTask构造方法,通过Executors.callable(runnable, result)将Runnable转为Callable
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }
        
        // 任务时间比较
        public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
        
        ......
    }
           
}

梳理一下以上源码,可得SchduledThreadPoolExecutor添加任务的常规流程如下:

  1. 创建ScheduledFutureTask调度任务实例,将Runnable任务转为Callable
  2. ScheduledFutureTask实例添加到延迟队列DealyedWorkQueue中 2.1 根据任务时间task.time对任务进行排序workQueue.siftUp() -> task.compareTo() 2.2 若添加的任务是即时任务(即位于队列头部),则唤醒一个等待线程
  3. 判断相关状态是否符合条件执行新任务
  • 状态符合且从队列移除成功:新任务未被执行则取消,执行中则不取消,对应task.cancel(false)
  • 其它:预建线程ensurePrestart()

ScheduledThreadPoolExecute.png

WorkStealingPool

fork:分流,分叉

fork/join框架是用Java 7提出的,它提供了一些工具,可通过尝试使用所有可用的处理器内核来帮助加速并行处理,这是通过分而治之的方法来实现的。在fork/join框架中,将任务递归分解(fork)为较小的独立子任务异步执行,然后把执行的子结果合并(join)为单个结果,分解的任务借助线程池提供的工作线程来解决。 在Java中,fork/join框架使用称为ForkJoinPool类的线程池来实现,该线程池管理工作线程类ForkJoinWorkerThread。ForkJoinPool与其他类型的ExecutorService实现的主要区别在于采用了工作窃取:池中的所有线程都会尝试查找和执行提交给池或由其他活动任务创建的任务(如果没有任务,则阻塞等待工作)。Runnable任务无法进行任务拆分,所以Java 7后新增了一个任务可拆分的ForkJoinTask抽象类,可拆分的任务类都需继承该类。ForkJoinPool接收到的Runnable任务实例如果没有集成ForkJoinTask,则将被封装到用于适配ForkJoinPool可执行任务的ForkJoinTask子类RunnableExecuteAction中,但该任务依旧无法被fork/join执行。以下是JDK中WorkStealingPool构建与任务执行的相关源码:

public class Executors {
    ......
    
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
    
    ......
}

@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
 
    ......
 
    
    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }
    
    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }
    
    
    public void execute(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask<?> job;
        if (task instanceof ForkJoinTask<?>) // avoid re-wrap
            job = (ForkJoinTask<?>) task;
        else
            // 将task封装成ForkJoinPool可执行的任务
            job = new ForkJoinTask.RunnableExecuteAction(task);
        externalPush(job);
    }
    
    ......
    
    // 适用于Runnable的适配器
    static final class RunnableExecuteAction extends ForkJoinTask<Void> {
        final Runnable runnable;
        RunnableExecuteAction(Runnable runnable) {
            if (runnable == null) throw new NullPointerException();
            this.runnable = runnable;
        }
        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) { }
        // 直接执行任务不进行fork/join
        public final boolean exec() { runnable.run(); return true; }
        void internalPropagateException(Throwable ex) {
            rethrow(ex); // rethrow outside exec() catches.
        }
        private static final long serialVersionUID = 5232453952276885070L;
    }
    
    ......
}

该线程池涉及很多,使用不多,就此别过。真想了解,可点这个

扩展:Spring Boot快捷配置中的线程池

Spring生态中的spring-context依赖包中有两个封装了ThreadPoolExecutor的任务执行器ThreadPoolTaskExecutorThreadPoolTaskScheduler,这两个任务执行器都可以通过在Spring Boot的配置文件中进行相应线程池配置来生成单例线程池,在一些对任务调度要求不高的场景下使用起来十分方便,项目里同一类型的任务(如CPU密集型任务、I/O密集型任务)使用同一线程池也方便线程的维护。

org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor的使用

ThreadPoolTaskExecutor内部维护的线程池执行器实例是ThreadPoolExecutor,前文提及的FixedThreadPool与CachedThreadPool都是通过更改ThreadPoolExecutor的参数来实现的,因此我们也可以在Spring Boot的配置文件中进行相应的配置来生成对应的线程池,在此之前先介绍下ThreadPoolTaskExecutor的部分源码:

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
		implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
    // 核心线程池数监视器,用于同步更改线程池核心数
    private final Object poolSizeMonitor = new Object();
    
    private int corePoolSize = 1;

	private int maxPoolSize = Integer.MAX_VALUE;

	private int keepAliveSeconds = 60;

	private int queueCapacity = Integer.MAX_VALUE;

	private boolean allowCoreThreadTimeOut = false;

	@Nullable
	private TaskDecorator taskDecorator;
    @Nullable
    private ThreadPoolExecutor threadPoolExecutor;
    
    // 同步更改线程池核心数,避免并发修改配置的情况,如多人在配置中心同时修改了corePoolSize
	public void setCorePoolSize(int corePoolSize) {
		synchronized (this.poolSizeMonitor) {
			this.corePoolSize = corePoolSize;
			if (this.threadPoolExecutor != null) {
				this.threadPoolExecutor.setCorePoolSize(corePoolSize);
			}
		}
	}
    
    // 初始化线程池
    @Override
	protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
        // 创建队列
		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

		ThreadPoolExecutor executor;
        ......
		return executor;
	}
    
    ......

	/**
	 * 根据配置的队列容量判断线程池用的队列是LinkedBlockingQueue还是SynchronousQueue
	 */
	protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
		if (queueCapacity > 0) {
			return new LinkedBlockingQueue<>(queueCapacity);
		}
		else {
			return new SynchronousQueue<>();
		}
	}
}

application.yml文件配置范例(大部分使用默认即可):

spring:
  task:
    execution:
      pool:
        # 线程核心数,不配置则默认1
        core-size: 5
        # 不配置则默认使用无界的LinkedBlockingQueue队列,0则使用SynchronousQueue队列
        # queue-capacity: 0
        # 线程存活时间,默认60s,可不配
        # keep-alive: 60s
        # 是否允许核心线程超时,默认true,这样可以动态增加和缩小池
        # allow-core-thread-timeout: true

注入使用测试:

    @Resource
    private ThreadPoolTaskExecutor executor;

    @PostConstruct
    public void init(){
        executor.execute(() -> System.out.println("ThreadPoolTaskExecutor execute task"));
    }

org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler的使用

ThreadPoolTaskSchedulerThreadPoolTaskExecutor最主要的区别是维护的ExecutorService不同,ThreadPoolTaskScheduler的配置都是针对于scheduledExecutor,且应用类需添加@EnableScheduling注解,scheduledExecutor部分源码如下:

public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
		implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {
    private volatile int poolSize = 1;
    // 如果ScheduledExecutorServiced的任务类ScheduledFutureTask.cancel()后任务该从队列中删除,则为True,默认false,也是配到ScheduledExecutorService中
	private volatile boolean removeOnCancelPolicy = false;

    // 调度任务执行的异常处理器,空则使用默认处理器TaskUtils.getDefaultErrorHandler(true)
	@Nullable
	private volatile ErrorHandler errorHandler;

	@Nullable
	private ScheduledExecutorService scheduledExecutor;
    
    ......
    
    // 初始化scheduledExecutor
    @Override
	protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

		this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);

		if (this.removeOnCancelPolicy) {
			if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
				((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true);
			}
			else {
				logger.debug("Could not apply remove-on-cancel policy - not a ScheduledThreadPoolExecutor");
			}
		}
		return this.scheduledExecutor;
	}
    
    protected ScheduledExecutorService createExecutor(
			int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
		return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
	}
}

application.yml文件配置范例(大部分使用默认即可):

spring:
  task:
    # 普通任务线程池配置
    execution:
      pool:
        # 线程核心数,不配置则默认1
        core-size: 5
        # 不配置则默认使用无界的LinkedBlockingQueue队列,0则使用SynchronousQueue队列
        # queue-capacity: 0
        # 线程存活时间,默认60s,可不配
        # keep-alive: 60s
        # 是否允许核心线程超时,默认true,这样可以动态增加和缩小池
        # allow-core-thread-timeout: true
    # 任务调度线程池配置
    scheduling:
      # 线程名前缀,默认"scheduling"
      thread-name-prefix: scheduled-task-
      pool:
        size: 5
      #shutdown:
        # 执行程序是否应在shutdown时等待计划的任务完成,不配默认false
        #await-termination: false
        # 等待剩余任务完成的最长时间,默认空,即上次任务是否完成不影响下次任务的执行
        #await-termination-period: 10s

注入ThreadPoolTaskScheduler的应用一定添加@EnableScheduling注解,否则配置类TaskSchedulingAutoConfiguration无法生成TaskScheduler导致程序将无法启动,使用范例如下:

@SpringBootApplication
@EnableScheduling
public class SimpleWebApplication {
    @Resource
    private ThreadPoolTaskExecutor executor;
    @Resource
    private ThreadPoolTaskScheduler scheduler;

    @PostConstruct
    public void init() {
        // 每5秒打印一次当前时间,参数与ScheduledThreadPoolExecutor有点区别但不大
        scheduler.scheduleAtFixedRate(() -> System.out.println(LocalDateTime.now()),
                Duration.of(5, ChronoUnit.SECONDS));
    }

    public static void main(String[] args) {
        SpringApplication.run(SimpleWebApplication.class, args);
    }
}

线程池大小

线程池中的线程数过多时会添加上下文切换的成本,过少时又没有充分利用好CPU的资源。对于线程池线程数的大小设置,根据线程任务类型的不同市面上都有个比较通用的公式,具体如下(N为CPU核心数):

  • CPU(计算)密集型任务(N+1):线程池的任务都是需要CPU资源进行运算的,则可将线程数设置为N+1,比CPU 核心数多出的一个线程是为了防止线程偶发的中断,或者其它原因导致的任务暂停而带来的影响从而使CPU处于空闲状态而没有被充分利用。
  • I/O 密集型任务(2N): I/O任务的性能瓶颈主要在IO上,对CPU资源的占用十分少,IO的阻塞占据了任务的大部分时间,而线程在处理 I/O 的时间段内不会占用CPU来处理,此时就可以将CP让交出给其它线程使用。因此在 I/O 密集型任务的线程池可以多配置一些线程,一般设置为2N

java中可通过Runtime.getRuntime().availableProcessors()获取CPU核心数。

上下文切换(Context Switch)

上下文切换(有时也称做进程切换或任务切换)是指 CPU 从一个进程或线程切换到另一个进程或线程。CPU的进程调度算法为时间片论证,为每个线程分配时间片,当一个线程的时间片用完后无论其任务执行结束与否都必须让出CPU的占用,让出前须将线程里的信息(如变量、执行到的代码行)保存到线程私有区(如栈、程序计数器),以便下次获取到时间片后该线程任务的正常继续运行。 上下文切换通常是计算密集型的,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间且需线程信息的保存,所以一般CPU任务密集型的线程池线程数都不会设置超出核心数太多。

任务类型

  • CPU密集型:需利用CPU的计算机能力对内存中的数据进行运算,如列表的排序、最大值最小值的查找等
  • I/O密集型:大部分时间都用来处理 I/O 交互,比如Socket的连接、文件的读写等

结尾

简单结尾tag