在生产环境中,为每个任务分配一个线程存在一些缺陷,尤其是当需要创建大量线程时。
- 线程生命周期的开销非常高。线程的创建和销毁都是有开销的。
- 资源消耗。活跃的线程会消耗系统资源,尤其是内存。
- 稳定性。在可创建线程的数量上存在一个限制。如果破坏了这个限制,很可能抛出OutOfMemoryError异常。
也就是说,在一定范围内,增加线程可以提高系统的吞吐率,但是超过了这个范围,再创建更多的线程只会降低程序的执行速度。
在《阿里巴巴Java手册》中提到:
线程池是指管理一组同构工作线程的资源池。线程从工作队列中获取一个任务,执行任务,然后返回线程等待下一个任务。
在线程池中执行任务比为每一个任务分配一个线程优势更多。通过重用现有的线程而不是创建新线程,可以分摊在线程创建和销毁过程中产生的巨大开销。当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程延迟任务执行,提高了响应性。通过适当调整线程池的大小,可以创建足够多的线程使得处理器保持忙绿,同时还可以防止过多线程互相竞争使得内存耗尽。
因此应该使用线程池。其目的包括:
- 线程是稀缺资源,不能频繁创建
- 线程的创建和执行完全分开,方便维护,进行了解耦
- 将线程放入池中,可以给其他任务复用
Executor框架
Executor框架是在java.util.concurrent包下,在Java 5中引入的。通过Executor来启动线程比使用Thread.start()方法更好,更易于管理且效率更高(用线程池实现节约了开销),并且还有助于避免this逸出。
Executor两级调度模型如下。用户将多个任务提交给Executor框架,框架在线程池中分配线程执行它们。然后操作系统再将这些线程分配给处理器执行。
Executor框架结构图: 框架中的所有类可以分为三类:- 任务:Runnable和Callable
- 任务执行器:Executor,其子接口为ExecutorService,该子接口的实现类有:ThreadPoolExecutor和ScheduledThreadPoolExecutor
- 执行结果:使用Future接口表示异步的执行结果,其实现类为FutureTask。
Executor
Executor是个简单的接口,它支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程与执行过程解耦,并用Runnable表示任务。Executor的实现还提供了对生命周期的支持。
Executor接口中之定义了一个方法execute(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类。
Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。
class TaskExecWebServer{
private static final int NTHREADS=100;
private static final Executor executor=Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args) throws IOException {
ServerSocket serverSocket=new ServerSocket(8080);
while(true){
final Socket socket=serverSocket.accept();
Runnable task=new Runnable() {
@Override
public void run() {
System.out.println(socket);
}
};
executor.execute(task);
}
}
}
class ThreadPerTaskExecutor implements Executor{
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}
class WithinThreadExecutor implements Executor{
@Override
public void execute(Runnable command) {
command.run();
}
}
在上述代码中,我们首先在webServer中使用了一个固定大小为100的线程池。然后使用executor.execute(task)来提交线程。在执行的过程中,可以采用不同的执行方法。可以为每个请求启动一个新的线程,如ThreadPerTaskExecutor,还可以以同步方式执行所有任务,如WithinThreadExecutor。这样就实现了把任务的提交和执行解耦。
ThreadPoolExecutor
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
corePoolSize
是线程池的基本大小。maximumPoolSize
是线程池的最大线程大小。keepAliveTime
和unit
是线程空闲后的存活时间workQueue
是用于存放任务的阻塞队列handler
是当队列和最大线程池都满了之后的饱和策略
execute()
- 首先获取线程池的状态
- 如果线程池中线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新线程来执行新添加的任务。
- 如果当前线程处于运行状态,并且成功的写入了阻塞队列
- 双重检查,再次获取线程状态;如果线程不是运行状态就需要从阻塞队列移除,并尝试判断线程是否执行完毕。同时执行拒绝策略。
- 如果当前线程池为空就新创建一个线程并执行。
- 如果在第三部的判断为非运行状态,创建新线程,失败则执行拒绝策略。
生命周期
为了解决执行服务的生命周期问题,Executor拓展了ExecutorService接口,添加了一些用于生命周期管理的方法。ExecutorService的生命周期有三种状态:运行、关闭和已终止。
在初始创建时,ExecutorService处于运行状态。使用shutdown()方法将不再接受新的任务,同时等待提交的任务都执行完成之后再关闭。调用shutdownNow()方法,相当于调用了每个线程的interrupt()方法,将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
在ThreadPoolExecutor中,定义了如下状态:
- RUNNING 是运行状态,可以接受执行队列中的任务
- SHOTDOWN 是调用了shutdown()方法
- STOP 是调用了shutdownNow()方法
- TIDYING 是所有任务都执行完毕的状态,在调用shutdown()或shutdownNow()时都会尝试更新为这个状态
- TERMINATED 是终止状态,当执行terminated()会更新为这个状态
线程池
Executors提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。
1.newFixedThreadPool(int nThreads)
创建了固定线程数目的线程池,每当提交一个任务时就创建一个县城,直到达到线程池的最大数量。当线程池满时,有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程被移出。
针对一些稳定的并发线程,多用于服务器。
2.newCachedThreadPool()
创建了一个可以缓存的线程池,没有规模限制。调用execute将重用以前构造的线程(如果线程可用)。如果当前没有可用线程,则创建一个新线程;如果当前规模超过了处理需求,将从缓存中终止并移除已经有60秒未使用的线程。
放入该线程池的线程超过timeout不活动,会被自动终止,缺省设置为60秒。
通常用于执行一些生存期很短的异步型任务。
3.newSingleThreadExecutor()
创建了一个单线程的Executor,创建了单个线程来执行任务,如果线程异常结束,会创建另一个线程来替代。
确保依照任务在队列中的顺序来串行执行(FIFO,LIFO,优先级)。
注意
4.newSechduledThreadPool(int corePoolSize)
与前几种不同的是,这个方法返回的是ScheduledExecutorService,前边三种返回的是ExecutorService。创建了一个固定长度的线程池,并且是支持定时以及周期性的任务执行的线程池。多数情况下用来替代Timer类。
Timer与ScheduledThreadPoolExecutor
Timer用来负责管理延迟任务以及周期任务,但是它存在一些缺陷,应该使用与ScheduledThreadPoolExecutor来替代它。
Timer类在执行所有定时任务时只会创建一个线程,当有多个定时任务时,就会产生延迟。
当多个定时任务中有一个任务抛出异常,所有的任务都无法执行。
Timer执行周期任务时依赖系统时间。后者不会由于系统时间的改变而发生执行的变化。
提交与执行任务
任务分为两类,一类是实现了Runnable接口的类,一类是实现了Callable接口的类,都能被Executor执行。
Runnable是一种有局限的抽象,因为它不能返回值或者抛出一个受检查的异常。
Callable的call()方法类似于Runnable的run()方法,但是call()方法有返回值,并且可能抛出一个异常。
任务的生命周期包括:创建,提交,开始和完成。在Executor框架中,已经提交但尚未开始的任务可以取消,对于已经开始执行的任务,只有当它们响应中断时,才能取消。
向线程池中提交线程的时候有两种方法:execute()和submit()。
execute()提交只能提交一个Runnable对象,且返回值是void。也就是说提交后如果线程运行,和主线程就脱离了关系。
submit()提交:
- Future submit(Callable task)
- Future<?> submit(Runnable task)
- Future submit(Callable task,T result)
submit()方法可以提交一个Callable接口的对象,也能提交一个Ruuable的对象。使用这种方式提交会返回一个Future对象,代表了该线程的执行结果,主线程通过get方法获取到从线程中返回的结果数据。使用Future可以表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消;还能获取任务的结果;并能取消任务。
Future的get()方法:
- 任务已经完成,返回结果或者抛出一个异常
- 任务没有完成,将阻塞直至完成
- 任务抛出异常,get()方法将该异常封装为ExecutionExcepiton并重新抛出,可以使用getCause()来获取被封装的初始异常
- 任务被取消,get()方法将抛出CancellationException
CompletionService
将executor与blockingqueue融合在一起,将Callable任务交由它执行,使用类似队列操作的take和poll等方法获得future。也就是在构造函数中创建了阻塞队列来保存执行完成的结果。
为任务设计时限
使用Future.get(long,timeType)为任务设计时限,时限内有结果则返回,超时则抛出TimeOutException
支持时限的还有invokeAll()方法,它将多个任务提交给ExecutorService并获取结果。其参数为一组任务,返回一组Future,按照任务集合中的顺序将Future添加到返回的集合,实现了二者的关联。当所有任务执行完毕,或超市,或中断,该方法将返回。通过get或isCancelled来判断每个任务的执行情况。