多线程笔记(三)

360 阅读4分钟

为什么要用线程池

线程是不是越多越好? 1、线程不仅java中是一个对象,每个线程都有自己的工作内存,

   > 线程创建、销毁需要时间,消耗性能。
   > 线程过多,会栈用很多内存

2、操作系统需要频繁切换线程上下文(大家都想被运行),影响性能。

3、如果创建时间+ 销毁时间 > 执行任务时间 就很不合算

线程池的推出,就是为了方便的控制线程数量

线程池的原理

1、线程池管理器:用于创建并管理线程池,包括创建线程池,销毁线程池,添加新任务;

2、工作线程:线程池中线程,可以循环的执行任务,在没有任务时处于等待状态;

3、任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;

4、任务队列:用于存放没有处理的任务。提供一种缓冲机制

类的层次结构

代码实现

public class ThreadPoolDemo {
	public void command(ThreadPoolExecutor threadPoolExecutor) {
		
		for (int i = 0; i < 30; i++) {
			int n = i;
			threadPoolExecutor.submit(new Runnable() {
				@Override
				public void run() {
					try {
						System.out.println(n + "开始执行");
						Thread.sleep(3000);
						System.out.println(n + "执行完成");
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			});
			System.out.println("任务:" + i + "执行成功");
		}
		while(true) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("当前线程数" + threadPoolExecutor.getPoolSize());
			System.out.println("当队列数" + threadPoolExecutor.getQueue().size());
		}
	}
	
	public void test1() {
		//创建线程池
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
				5, //核心线程数
				10, //最大线程数
				5, //存活时间
				TimeUnit.SECONDS, //单位秒
				new LinkedBlockingQueue<Runnable>(5),//队列
				new RejectedExecutionHandler() { //拒绝策略
					@Override
					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
						System.out.println("有任务被拒绝了");
					}
				});
		
		command(threadPoolExecutor);
	}
	
	public static void main(String[] args) {
		ThreadPoolDemo t = new ThreadPoolDemo();
		t.test1();
	}
}

线程池API-Executors工具类

你可以自己实例化线程池,也可以用Executors 创建线程池的工厂类,常用方法如下:

newFixedThreadPool(int nThreads) 创建一个固定大小、任务队列容量无界的线程池。核心线程数=最大线程数。

newCachedThreadPool() 创建的是一个大小无界的缓冲线程池。它的任务队列是一个同步队列。任务加入到池中,如果 池中有空闲线程,则用空闲线程执行,如无则创建新线程执行。池中的线程空闲超过60秒,将被销毁释放。线程数随任务的多少变化。适用于执行耗时较小的异步任务。池的核心线程数=0 ,最大线程数= Integer.MAX_VALUE

newSingleThreadExecutor() 只有一个线程来执行无界任务队列的单一线程池。该线程池确保任务按加入的顺序一个一 个依次执行。当唯一的线程因任务异常中止时,将创建一个新的线程来继续执行后续的任务。与newFixedThreadPool(1) 的区别在于,单一线程池的池大小在newSingleThreadExecutor方法中硬编码,不能再改变的。

newScheduledThreadPool(int corePoolSize) 能定时执行任务的线程池。该池的核心线程数由参数指定,最大线程数= Integer.MAX_VALUE

线程数量

如何确定合适数量的线程

计算型任务:cpu数量的1-2倍

IO型任务:相对比计算型任务,需多一些线程,要根据具体的IO阻塞时长进行考量决定。 也可考虑根据需要在一个最小数量和最大数量间自动增减线程数。

如tomcat中默认的最大线程数为:200

了解一下同步队列

public void synchronizedTest() {
		//同步队列等于没有容量的队列,如果put一个数据,没有take就会一直阻塞,直到take
		SynchronousQueue<String> queue = new SynchronousQueue<String>();
		
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("before  put");
					queue.put("测试");
					System.out.println("after  put");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();
		try {
			Thread.sleep(100L);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					Thread.sleep(3000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				try {
					System.out.println("before  take");
					queue.take();
					System.out.println("after  take");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();
	}

手写一个线程池的实现

//手写线程池
public class FixedSizeThreadPool {
	//1.需要一个仓库(队列)
	private BlockingQueue<Runnable> blockingQueue;
	
	//2.工作线程的集合
	private List<Thread> works;
	
	//3.普通线程执行多个task,封装
	public class Work extends Thread{
		private FixedSizeThreadPool pool;
		
		public Work(FixedSizeThreadPool pool) {
			this.pool = pool;
		}
		
		@SuppressWarnings("unused")
		@Override
		public void run() {
			while(this.pool.isworking || this.pool.blockingQueue.size() > 0) {
				Runnable task = null;
				if(this.pool.isworking ) {
					try {
						this.pool.blockingQueue.take();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				} else {
					this.pool.blockingQueue.poll();
				}
				if(task != null) {
					task.run();
				}
			}
		}
	}
	//初始化
	public FixedSizeThreadPool(int poolSize, int queueSize){
		if(poolSize <= 0 || queueSize <= 0) {
			throw new IllegalAccessError("参数异常");
		}
		
		this.blockingQueue = new LinkedBlockingQueue<Runnable>(queueSize);
		
		this.works = Collections.synchronizedList(new ArrayList<Thread>());
		
		for (int i = 0; i < poolSize; i++) {
			Work work = new Work(this);
			work.start();
			works.add(work);
		}
	}
	//非阻塞的
	public boolean submit(Runnable task) {
		if(isworking) {
			return this.blockingQueue.offer(task);
		} else {
			return false;
		}
	}
	//阻塞的
	public void execute(Runnable task) {
		if(isworking){
			try {
				this.blockingQueue.put(task);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	private volatile boolean isworking = true;

    //关闭线程池
    //a. 禁止往队列提交任务
    //b. 等待仓库中的任务执行
    //c. 关闭的时候,再去那任务就不用阻塞,因为不会有新任务来了
    //d. 关闭的时候,阻塞的线程,就要强行中断
	public void shutdown() {
		isworking = false;
		for (Thread thread : works) {
			if(thread.getState().equals(Thread.State.WAITING) || 
			   thread.getState().equals(Thread.State.BLOCKED)) {
				thread.interrupt();
			}
		}
	}
}