阅读 171

并发编程(5)ThreadPoolExecutor原理解析

概述

由于线程的创建跟销毁是比较消耗资源的,也是比较耗时的。可能为了程序的需要,我们会创建很多线程,所以很有必要对线程进行一个统一的管理,所以就出现了线程池。通过线程池,我们可以重复利用一些线程资源,同时可以统一管理应用内的线程,防止内存泄露。

运行机制

Executor
当我们创建一个任务之后,放进线程池之后,线程池会做如下判断

  • 1.判断核心线程池里的线程是否都在执行任务:否的话则将新任务放入线程池中进行执行,否则进行下一步。
  • 2.判断缓存队列是否未满:是的话,将新任务放入缓存队列,否则进行下一步
  • 3.判断线程池的线程是否都处于工作状态:是的话就就执行线程抛弃策略,否则就执行当前任务

继承关系

ScheduledThreadPoolExecutor

Executor 接口定义了线程池最基本的方法,提交Runnable 任务

public interface Executor {
    void execute(Runnable command);
}
复制代码

ExecutorService 扩充了提交任务的类型,并且定义了线程池关闭任务的方法。

ExecutorService

AbstractExecutorService 是抽象类,主要是对ExecutorService 的一些具体实现 ThreadPoolExecutor 是最核心的一个类,下面会具体分析其源码。 ScheduledThreadPoolExecutor则是在 在 ThreadPoolExecutor 的基础上增加了时间调度的功能

成员变量

	 //32-3=29,线程数量所占位数
	private static final int COUNT_BITS = Integer.SIZE – 3;    
	//低29位表示最大线程数,2的29次幂-1
	private static final int CAPACITY = (1 << COUNT_BITS) – 1;    
	//线程池自身的状态
	
	//符号位101
    private static final int RUNNING    = -1 << COUNT_BITS;
    //高3位000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
     //高3位001
    private static final int STOP       =  1 << COUNT_BITS;
   //高3位010
    private static final int TIDYING    =  2 << COUNT_BITS;
     //高3位011
    private static final int TERMINATED =  3 << COUNT_BITS;
    //缓存队列,等待中的线程任务队列
    private final BlockingQueue<Runnable> workQueue;
    //线程池中工作的线程集合
    private final HashSet<Worker> workers = new HashSet<>();
    //最大线程数
    private int largestPoolSize;
    //完成任务的线程数量
    private long completedTaskCount;
    //创建线程池的工厂类
    private volatile ThreadFactory threadFactory;
    //线程池丢弃策略
    private volatile RejectedExecutionHandler handler;
    //在等待执行任务的线程的最大等待时间
    private volatile long keepAliveTime;
    //核心线程数
    private volatile int corePoolSize;
    //线程池最大可容纳的线程数
    private volatile int maximumPoolSize;
    //默认的线程丢弃策略
    private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    //int 型变量,低3位表示线程池状态,剩余的位数表示最大线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

复制代码

构造方法

constructors
其实前面的三个构造方法最终都调用了最后一个构造方法,所以就来看看最后一个构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}
复制代码

参数比较多,下面来解释一下

  • corePoolSize核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过- corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:keepAliveTime的时间单位,有如下几种取值
时间单位 解释
TimeUnit.DAYS
TimeUnit.HOURS 小时
TimeUnit.MINUTES 分钟
TimeUnit.SECONDS
TimeUnit.MILLISECONDS 毫秒
TimeUnit.MILLISECONDS 微妙
TimeUnit.NANOSECONDS 纳秒
  • workQueue : 一个阻塞队列,用来存储等待执行的任务,参考下图

BlockingQueue

阻塞队列,如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒,同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间时才会被唤醒继续操作。

  • ArrayBlockingQueue(有界队列): FIFO 队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小

  • LinkedBlockingQueue(无界队列):FIFO 队列,大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。

  • PriorityBlockingQueue:优先级队列, 类似于LinkedBlockingQueue,但队列中元素非 FIFO, 依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序

  • SynchronousQueue(直接提交策略): 交替队列,队列中操作时必须是先放进去,接着取出来,交替着去处理元素的添加和移除

threadFactory::创建线程池的工厂

RejectedExecutionHandler: 线程丢弃策略,常见的有如下几种

丢弃策略 解释
DiscardPolicy 丢弃任务,但是不抛出异常
CallerRunsPolicy 由调用线程处理该任务
AbortPolicy 丢弃任务并抛出RejectedExecutionException
DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

execute方法

提交Runnable任务

execute方法

    public void execute(Runnable command) {
	    //为空的话,抛出空指针异常
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //判断加入当前任务后,线程数是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
        //如果小于核心线程数,则将立即执行当前任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //核心线程数满了之后,先判断线程池的状态,然后判断缓存队列是否还能进行缓存
        if (isRunning(c) && workQueue.offer(command)) {
	        //如果两者都满足条件,则将线程加入缓存队列
            int recheck = ctl.get();
            //如果当前任务没有在运行已经被移除,执行线程丢弃策略
            if (!isRunning(recheck) && remove(command))
                reject(command);
             // 正在运行的线程数如果是0,则直接运行当前线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //加入当前任务失败,则执行丢弃策略
        else if (!addWorker(command, false))
            reject(command);
    }

复制代码

addWorker方法

有两个参数,一个是firstTask,表示加入的Runnable任务,一个是core,表示是否添加到核心线程。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry://定义了循环的名称,便于后面直接中断循环
        for (;;) {
            int c = ctl.get();//获取状态与数量的标志位
            int rs = runStateOf(c);//判断线程状态
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
//线程池处于关闭状态,firstTask为null,或者缓存队列为空,返回false
                return false;
                //死循环
            for (;;) {
                int wc = workerCountOf(c);//获取线程池数量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    //比对核心线程数与最大线程数
                    return false;
                if (compareAndIncrementWorkerCount(c))
                //添加线程成功,中断循环
                    break retry;
                c = ctl.get();  //重新获取线程状态与数量的标志位
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
	        //创建一个心的worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
              final ReentrantLock mainLock = this.mainLock;
               //给当前线程上锁
                mainLock.lock();
                try {
            //获取当前线程池状态跟线程数的标记为
              int rs = runStateOf(ctl.get());
               if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
        if (t.isAlive()) // precheck that t is startable
        throw new IllegalThreadStateException();
                        //将worker添加到缓存队列中去
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                         //添加成功,改变标记位
                        workerAdded = true;
                    }
                } finally {
                //释放锁
                    mainLock.unlock();
                }
                if (workerAdded) {
	                //添加成功之后,开启线程执行任务
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
            //添加失败,调用addWorkerFailed方法
               addWorkerFailed(w);
        }
        return workerStarted;
    }
复制代码

submit

FutureTask

继承关系

先看一下FutureTask的继承关系

FutureTask

Runnable 很常见的接口,定义了run方法

public interface Runnable {
  public abstract void run();
}
复制代码

Future 带有返回值的泛型接口

public interface Future<V> {
    boolean isCancelled();//任务是否取消
    boolean isDone();//任务是否完成
    //同步方法,任务执行的返回值
    V get() throws InterruptedException, ExecutionException;
    //timeout后获取等待结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
        }
复制代码

RunnableFuture 继承自Runnable,Future的泛型返回接口

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
复制代码

Callable //带有返回值的Runnable额接口

public interface Callable<V> {
  V call() throws Exception;
}
复制代码
构造方法

Callable构造方法

   public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;    
    }

复制代码

Runnable+Result构造方法

public FutureTask(Runnable runnable, V result) {
   this.callable = Executors.callable(runnable, result);
   this.state = NEW;   
    }
复制代码

可以看到不管是Callable还是Runnable构造方法,最后都是使用Callable来进行构造的,之所以这么做,是因为FutureTask需要返回值

提交任务

提交Runnable任务

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

复制代码

提交callable任务+返回值


    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

复制代码

提交callable任务

    public <T> Future<T> submit(Callable<T> task) {
	     //首先判断是否为空
        if (task == null) throw new NullPointerException();
        //将Callable转换成Future
        RunnableFuture<T> ftask = newTaskFor(task);
        //执行execute方法
        execute(ftask);//最后依然会调用execute Runnable方法
        return ftask;
    }
复制代码

不管是提交什么样task,最后都会被包装成Runnable方法来执行,还是会调用Executor的execute方法。

tryTerminate

这个方法是当线程池关闭的时候会调用

    final void tryTerminate() {
     //开启死循环
        for (;;) {
            //获取线程池状态跟数量的标志位
            int c = ctl.get();
            //判断三个条件
            //1.线程是否在运行
            //2.线程池状态小于TIDYING,TERMINATED
            //3.线程池已经关闭并且队列为空
            满足上面的任意一个条件就会直接返回,很好理解
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && !   workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { 
            // 如果线程数不为0,才有资格去终止
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
              if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                    //CAS设置状态成功,调用terminated,默认空实现
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
复制代码

advanceRunState();

更改当前线程池的状态

  private void advanceRunState(int targetState) {
        for (;;) {
        //获取当前线程的状态及数量的标志位
            int c = ctl.get();
        //更改线程状态
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
复制代码

shutdown

 public void shutdown() {
		 //锁住线程池
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
    // 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
            advanceRunState(SHUTDOWN);
       // 中断等待中的线程
            interruptIdleWorkers();
            onShutdown();
        } finally {
        //释放锁
            mainLock.unlock();
        }
        tryTerminate();
    }
复制代码

shutdownNow

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
          //传入Stop状态,其余跟shutdown保持一致
            advanceRunState(STOP);
          //中断所有线程
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
复制代码

shutdownNow() 和 shutdown()的大体流程相似,差别是:

  • 1、advanceRunState传入的是Stop
  • 2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程
  • 3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务

Executors

Executors是Java提供的一个线程池的帮助类,可以帮助我们快速的处理线程池。

构造线程池

由于Java的线程池的构造方法比较复杂,所以Java又提供了Executors这个辅助类,帮助我们更快速地创建ThreadPoolExecutor,可以帮助我们创建4种类型的ThreadPool

Executors_create

  • 单线程异步队列:Executors.newSingleThreadExecutor(),创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行>所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池>保证所有任务的执行顺序按照任务的提交顺序执行。

  • 周期性调度:Executors.newFixedThreadPool(),创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

  • 可缓存的线程:Executors.newCachedThreadPool(int size),创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

  • 多线程周期性调度:Executors.newScheduledThreadPool(1),创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

Callable转换

executors_change

参考资料

www.cnblogs.com/trust-freed…

zhuanlan.zhihu.com/p/27232156

评论