为什么要用线程池
线程是不是越多越好? 1、线程不仅java中是一个对象,每个线程都有自己的工作内存,
> 线程创建、销毁需要时间,消耗性能。
> 线程过多,会栈用很多内存
2、操作系统需要频繁切换线程上下文(大家都想被运行),影响性能。
3、如果创建时间+ 销毁时间 > 执行任务时间 就很不合算
线程池的推出,就是为了方便的控制线程数量
线程池的原理
1、线程池管理器:用于创建并管理线程池,包括创建线程池,销毁线程池,添加新任务;
2、工作线程:线程池中线程,可以循环的执行任务,在没有任务时处于等待状态;
3、任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4、任务队列:用于存放没有处理的任务。提供一种缓冲机制
类的层次结构
代码实现public class ThreadPoolDemo {
public void command(ThreadPoolExecutor threadPoolExecutor) {
for (int i = 0; i < 30; i++) {
int n = i;
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println(n + "开始执行");
Thread.sleep(3000);
System.out.println(n + "执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("任务:" + i + "执行成功");
}
while(true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("当前线程数" + threadPoolExecutor.getPoolSize());
System.out.println("当队列数" + threadPoolExecutor.getQueue().size());
}
}
public void test1() {
//创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5, //核心线程数
10, //最大线程数
5, //存活时间
TimeUnit.SECONDS, //单位秒
new LinkedBlockingQueue<Runnable>(5),//队列
new RejectedExecutionHandler() { //拒绝策略
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("有任务被拒绝了");
}
});
command(threadPoolExecutor);
}
public static void main(String[] args) {
ThreadPoolDemo t = new ThreadPoolDemo();
t.test1();
}
}
线程池API-Executors工具类
你可以自己实例化线程池,也可以用Executors 创建线程池的工厂类,常用方法如下:
newFixedThreadPool(int nThreads) 创建一个固定大小、任务队列容量无界的线程池。核心线程数=最大线程数。
newCachedThreadPool() 创建的是一个大小无界的缓冲线程池。它的任务队列是一个同步队列。任务加入到池中,如果 池中有空闲线程,则用空闲线程执行,如无则创建新线程执行。池中的线程空闲超过60秒,将被销毁释放。线程数随任务的多少变化。适用于执行耗时较小的异步任务。池的核心线程数=0 ,最大线程数= Integer.MAX_VALUE
newSingleThreadExecutor() 只有一个线程来执行无界任务队列的单一线程池。该线程池确保任务按加入的顺序一个一 个依次执行。当唯一的线程因任务异常中止时,将创建一个新的线程来继续执行后续的任务。与newFixedThreadPool(1) 的区别在于,单一线程池的池大小在newSingleThreadExecutor方法中硬编码,不能再改变的。
newScheduledThreadPool(int corePoolSize) 能定时执行任务的线程池。该池的核心线程数由参数指定,最大线程数= Integer.MAX_VALUE
线程数量
如何确定合适数量的线程
计算型任务:cpu数量的1-2倍
IO型任务:相对比计算型任务,需多一些线程,要根据具体的IO阻塞时长进行考量决定。 也可考虑根据需要在一个最小数量和最大数量间自动增减线程数。
如tomcat中默认的最大线程数为:200
了解一下同步队列
public void synchronizedTest() {
//同步队列等于没有容量的队列,如果put一个数据,没有take就会一直阻塞,直到take
SynchronousQueue<String> queue = new SynchronousQueue<String>();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("before put");
queue.put("测试");
System.out.println("after put");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println("before take");
queue.take();
System.out.println("after take");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
手写一个线程池的实现
//手写线程池
public class FixedSizeThreadPool {
//1.需要一个仓库(队列)
private BlockingQueue<Runnable> blockingQueue;
//2.工作线程的集合
private List<Thread> works;
//3.普通线程执行多个task,封装
public class Work extends Thread{
private FixedSizeThreadPool pool;
public Work(FixedSizeThreadPool pool) {
this.pool = pool;
}
@SuppressWarnings("unused")
@Override
public void run() {
while(this.pool.isworking || this.pool.blockingQueue.size() > 0) {
Runnable task = null;
if(this.pool.isworking ) {
try {
this.pool.blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
this.pool.blockingQueue.poll();
}
if(task != null) {
task.run();
}
}
}
}
//初始化
public FixedSizeThreadPool(int poolSize, int queueSize){
if(poolSize <= 0 || queueSize <= 0) {
throw new IllegalAccessError("参数异常");
}
this.blockingQueue = new LinkedBlockingQueue<Runnable>(queueSize);
this.works = Collections.synchronizedList(new ArrayList<Thread>());
for (int i = 0; i < poolSize; i++) {
Work work = new Work(this);
work.start();
works.add(work);
}
}
//非阻塞的
public boolean submit(Runnable task) {
if(isworking) {
return this.blockingQueue.offer(task);
} else {
return false;
}
}
//阻塞的
public void execute(Runnable task) {
if(isworking){
try {
this.blockingQueue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private volatile boolean isworking = true;
//关闭线程池
//a. 禁止往队列提交任务
//b. 等待仓库中的任务执行
//c. 关闭的时候,再去那任务就不用阻塞,因为不会有新任务来了
//d. 关闭的时候,阻塞的线程,就要强行中断
public void shutdown() {
isworking = false;
for (Thread thread : works) {
if(thread.getState().equals(Thread.State.WAITING) ||
thread.getState().equals(Thread.State.BLOCKED)) {
thread.interrupt();
}
}
}
}