Java并发编程实战笔记4:线程池

820 阅读9分钟

在生产环境中,为每个任务分配一个线程存在一些缺陷,尤其是当需要创建大量线程时。

  • 线程生命周期的开销非常高。线程的创建和销毁都是有开销的。
  • 资源消耗。活跃的线程会消耗系统资源,尤其是内存。
  • 稳定性。在可创建线程的数量上存在一个限制。如果破坏了这个限制,很可能抛出OutOfMemoryError异常。

也就是说,在一定范围内,增加线程可以提高系统的吞吐率,但是超过了这个范围,再创建更多的线程只会降低程序的执行速度。

在《阿里巴巴Java手册》中提到:

线程池是指管理一组同构工作线程的资源池。线程从工作队列中获取一个任务,执行任务,然后返回线程等待下一个任务。

在线程池中执行任务比为每一个任务分配一个线程优势更多。通过重用现有的线程而不是创建新线程,可以分摊在线程创建和销毁过程中产生的巨大开销。当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程延迟任务执行,提高了响应性。通过适当调整线程池的大小,可以创建足够多的线程使得处理器保持忙绿,同时还可以防止过多线程互相竞争使得内存耗尽。

因此应该使用线程池。其目的包括:

  • 线程是稀缺资源,不能频繁创建
  • 线程的创建和执行完全分开,方便维护,进行了解耦
  • 将线程放入池中,可以给其他任务复用

Executor框架

Executor框架是在java.util.concurrent包下,在Java 5中引入的。通过Executor来启动线程比使用Thread.start()方法更好,更易于管理且效率更高(用线程池实现节约了开销),并且还有助于避免this逸出。

Executor两级调度模型如下。用户将多个任务提交给Executor框架,框架在线程池中分配线程执行它们。然后操作系统再将这些线程分配给处理器执行。

Executor框架结构图:
框架中的所有类可以分为三类:

  1. 任务:Runnable和Callable
  2. 任务执行器:Executor,其子接口为ExecutorService,该子接口的实现类有:ThreadPoolExecutor和ScheduledThreadPoolExecutor
  3. 执行结果:使用Future接口表示异步的执行结果,其实现类为FutureTask。

Executor

Executor是个简单的接口,它支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程与执行过程解耦,并用Runnable表示任务。Executor的实现还提供了对生命周期的支持。

Executor接口中之定义了一个方法execute(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类。

Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。

class TaskExecWebServer{
    private static final int NTHREADS=100;
    private static final Executor executor=Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket=new ServerSocket(8080);
        while(true){
            final Socket socket=serverSocket.accept();
            Runnable task=new Runnable() {
                @Override
                public void run() {
                    System.out.println(socket);
                }
            };
            executor.execute(task);
        }
    }
}

class ThreadPerTaskExecutor implements Executor{
    @Override
    public void execute(Runnable command) {
        new Thread(command).start();
    }
}

class WithinThreadExecutor implements Executor{
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}

在上述代码中,我们首先在webServer中使用了一个固定大小为100的线程池。然后使用executor.execute(task)来提交线程。在执行的过程中,可以采用不同的执行方法。可以为每个请求启动一个新的线程,如ThreadPerTaskExecutor,还可以以同步方式执行所有任务,如WithinThreadExecutor。这样就实现了把任务的提交和执行解耦。

ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 
  • corePoolSize是线程池的基本大小。
  • maximumPoolSize是线程池的最大线程大小。
  • keepAliveTimeunit是线程空闲后的存活时间
  • workQueue是用于存放任务的阻塞队列
  • handler是当队列和最大线程池都满了之后的饱和策略

execute()

  1. 首先获取线程池的状态
  2. 如果线程池中线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新线程来执行新添加的任务。
  3. 如果当前线程处于运行状态,并且成功的写入了阻塞队列
  4. 双重检查,再次获取线程状态;如果线程不是运行状态就需要从阻塞队列移除,并尝试判断线程是否执行完毕。同时执行拒绝策略。
  5. 如果当前线程池为空就新创建一个线程并执行。
  6. 如果在第三部的判断为非运行状态,创建新线程,失败则执行拒绝策略。

生命周期

为了解决执行服务的生命周期问题,Executor拓展了ExecutorService接口,添加了一些用于生命周期管理的方法。ExecutorService的生命周期有三种状态:运行、关闭和已终止。

在初始创建时,ExecutorService处于运行状态。使用shutdown()方法将不再接受新的任务,同时等待提交的任务都执行完成之后再关闭。调用shutdownNow()方法,相当于调用了每个线程的interrupt()方法,将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

在ThreadPoolExecutor中,定义了如下状态:

  • RUNNING 是运行状态,可以接受执行队列中的任务
  • SHOTDOWN 是调用了shutdown()方法
  • STOP 是调用了shutdownNow()方法
  • TIDYING 是所有任务都执行完毕的状态,在调用shutdown()或shutdownNow()时都会尝试更新为这个状态
  • TERMINATED 是终止状态,当执行terminated()会更新为这个状态

线程池

Executors提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

1.newFixedThreadPool(int nThreads)

创建了固定线程数目的线程池,每当提交一个任务时就创建一个县城,直到达到线程池的最大数量。当线程池满时,有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程被移出。

针对一些稳定的并发线程,多用于服务器。

2.newCachedThreadPool()

创建了一个可以缓存的线程池,没有规模限制。调用execute将重用以前构造的线程(如果线程可用)。如果当前没有可用线程,则创建一个新线程;如果当前规模超过了处理需求,将从缓存中终止并移除已经有60秒未使用的线程。

放入该线程池的线程超过timeout不活动,会被自动终止,缺省设置为60秒。

通常用于执行一些生存期很短的异步型任务。

3.newSingleThreadExecutor()

创建了一个单线程的Executor,创建了单个线程来执行任务,如果线程异常结束,会创建另一个线程来替代。

确保依照任务在队列中的顺序来串行执行(FIFO,LIFO,优先级)。

注意

4.newSechduledThreadPool(int corePoolSize)

与前几种不同的是,这个方法返回的是ScheduledExecutorService,前边三种返回的是ExecutorService。创建了一个固定长度的线程池,并且是支持定时以及周期性的任务执行的线程池。多数情况下用来替代Timer类。

Timer与ScheduledThreadPoolExecutor

Timer用来负责管理延迟任务以及周期任务,但是它存在一些缺陷,应该使用与ScheduledThreadPoolExecutor来替代它。

Timer类在执行所有定时任务时只会创建一个线程,当有多个定时任务时,就会产生延迟。

当多个定时任务中有一个任务抛出异常,所有的任务都无法执行。

Timer执行周期任务时依赖系统时间。后者不会由于系统时间的改变而发生执行的变化。

提交与执行任务

任务分为两类,一类是实现了Runnable接口的类,一类是实现了Callable接口的类,都能被Executor执行。

Runnable是一种有局限的抽象,因为它不能返回值或者抛出一个受检查的异常。

Callable的call()方法类似于Runnable的run()方法,但是call()方法有返回值,并且可能抛出一个异常。

任务的生命周期包括:创建,提交,开始和完成。在Executor框架中,已经提交但尚未开始的任务可以取消,对于已经开始执行的任务,只有当它们响应中断时,才能取消。

向线程池中提交线程的时候有两种方法:execute()和submit()。

execute()提交只能提交一个Runnable对象,且返回值是void。也就是说提交后如果线程运行,和主线程就脱离了关系。

submit()提交:

  • Future submit(Callable task)
  • Future<?> submit(Runnable task)
  • Future submit(Callable task,T result)

submit()方法可以提交一个Callable接口的对象,也能提交一个Ruuable的对象。使用这种方式提交会返回一个Future对象,代表了该线程的执行结果,主线程通过get方法获取到从线程中返回的结果数据。使用Future可以表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消;还能获取任务的结果;并能取消任务。

Future的get()方法:

  • 任务已经完成,返回结果或者抛出一个异常
  • 任务没有完成,将阻塞直至完成
  • 任务抛出异常,get()方法将该异常封装为ExecutionExcepiton并重新抛出,可以使用getCause()来获取被封装的初始异常
  • 任务被取消,get()方法将抛出CancellationException

CompletionService

将executor与blockingqueue融合在一起,将Callable任务交由它执行,使用类似队列操作的take和poll等方法获得future。也就是在构造函数中创建了阻塞队列来保存执行完成的结果。

为任务设计时限

使用Future.get(long,timeType)为任务设计时限,时限内有结果则返回,超时则抛出TimeOutException

支持时限的还有invokeAll()方法,它将多个任务提交给ExecutorService并获取结果。其参数为一组任务,返回一组Future,按照任务集合中的顺序将Future添加到返回的集合,实现了二者的关联。当所有任务执行完毕,或超市,或中断,该方法将返回。通过get或isCancelled来判断每个任务的执行情况。

参考资料