CountDownLatch(闭锁)
闭锁允许一个或者多个线程等待其他线程都完成了才继续执行。CountDownLatch 是一种闭锁的实现,使得一个或多个线程等待一组事情发生。通过计数器表示需要等待的事件数量;使用countDown()
方法将计数器减去1,表示有一个事件发生;使用await()
方法阻塞当前线程,等待计数器为0,也就是所有需要等待的事情发生。
CountDownLatch 不能重新初始化或者修改内部计数器的值。
CountDownLatch 内部依赖内部类 Sync 实现,而 Sync 类继承于 AQS。其内部通过共享锁实现。
示例:
public class CountDownLatchTest {
private static CountDownLatch countDownLatch = new CountDownLatch(5);
/**
* Boss线程,等待员工到达开会
*/
static class BossThread extends Thread{
@Override
public void run() {
System.out.println("Boss在会议室等待,总共有" + countDownLatch.getCount() + "个人开会...");
try {
//Boss等待
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有人都已经到齐了,开会吧...");
}
}
//员工到达会议室
static class EmpleoyeeThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ",到达会议室....");
//员工到达会议室 count - 1
countDownLatch.countDown();
}
}
public static void main(String[] args){
//Boss线程启动
new BossThread().start();
for(int i = 0 ; i < countDownLatch.getCount() ; i++){
new EmpleoyeeThread().start();
}
}
}
运行结果:
Boss在会议室等待,总共有5个人开会...
Thread-2,到达会议室....
Thread-3,到达会议室....
Thread-1,到达会议室....
Thread-5,到达会议室....
Thread-4,到达会议室....
所有人都已经到齐了,开会吧...
CycliBarrier(屏障)
闭锁是一次性对象,一旦进入终止状态,就不能被重置。屏障类似于闭锁,阻塞一组进程直到某个事件发生。也就是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CycliBarrier 内部使用了可重用锁 ReentrantLock 和 Condition。线程执行await()
方法后,计数器减1,进行等待,直到计数器为0,所有调用了await()
方法的线程继续执行。
CycliBarrier适用于多个线程结果合并的场景。
示例:
public class CyclicBarrierTest {
private static CyclicBarrier cyclicBarrier;
static class CyclicBarrierThread extends Thread{
public void run() {
System.out.println(Thread.currentThread().getName() + "到了");
//等待
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args){
cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("人到齐了,开会吧....");
}
});
for(int i = 0 ; i < 5 ; i++){
new CyclicBarrierThread().start();
}
}
}
运行结果:
Thread-0到了
Thread-1到了
Thread-4到了
Thread-2到了
Thread-3到了
人到齐了,开会吧...
闭锁和屏障的比较
CountDownLatch 的计数器只能使用一次,CyclicBarrier 的计数器可以使用 reset() 方法重置。
闭锁只会阻塞一条线程,目的是为了让该条任务线程满足条件后执行; 而同步屏障会阻塞所有线程,目的是为了让所有线程同时执行。
Semaphore(信号量)
信号量用于控制同时访问某个特定资源的操作数量,或者执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
信号量可以用于实现资源池,也可以用于将容器变为有界阻塞容器。信号量管理着一组虚拟的许可,在执行操作时首先获取许可,并在使用以后释放许可。如果没有许可,将阻塞直到有许可或被中断,超时。
信号量的使用场景是,有m个资源,n个线程,且n>m,同一时刻只能允许m条线程访问资源。
信号量内部同样依赖于继承自 AQS 的 Sync 类,且包含两个子类,分别是 FairSync 和 NonfairSync。
信号量通过acquire()
方法获取许可;通过release()
方法释放许可。
示例:
public class SemaphoreCase {
private static class Parking{
private Semaphore semaphore;
Parking(int count){
semaphore=new Semaphore(count);
}
public void park(){
try {
semaphore.acquire();
long time=(long) (Math.random()*10);
System.out.println(Thread.currentThread().getName()+"进入停车场停车"+time+"秒");
Thread.sleep(time);
System.out.println(Thread.currentThread().getName()+"开出停车场");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
private static class Car extends Thread{
Parking parking;
Car(Parking parking){
this.parking=parking;
}
@Override
public void run() {
parking.park();
}
}
public static void main(String[] args) {
Parking parking=new Parking(3);
for(int i=0;i<5;i++){
new Car(parking).start();
}
}
}
运行结果为:
Thread-2进入停车场停车7秒
Thread-1进入停车场停车9秒
Thread-0进入停车场停车4秒
Thread-0开出停车场
Thread-3进入停车场停车4秒
Thread-2开出停车场
Thread-4进入停车场停车3秒
Thread-1开出停车场
Thread-3开出停车场
Thread-4开出停车场
Exchanger(交换者)
Exchanger 是一个用于线程间协作的工具类,用于线程间的数据交换。它提供了一个同步点,在这个同步点,两个线程可以交换彼此的数据。两个线程通过exchange()
方法交换数据,一个线程执行了该方法,会一直等待另一个线程执行该方法,当两个线程都到达同步点,这两个线程就可以交换数据。
示例:
public class ExchangerCase {
private static class Producer implements Runnable{
private List<String> buffer;
private Exchanger<List<String>> exchanger;
Producer(List<String> buffer,Exchanger<List<String>> exchanger){
this.buffer=buffer;
this.exchanger=exchanger;
}
@Override
public void run() {
for(int i=1;i<5;i++){
System.out.println("生产者生产次数:"+i);
for(int j=1;j<=3;j++){
System.out.println("生产者装入"+i+"--"+j);
buffer.add("buffer: "+i+"--"+j);
}
System.out.println("生产者装满,等待消费者交换");
try {
exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class Consumer implements Runnable{
private List<String> buffer;
private Exchanger<List<String>> exchanger;
Consumer(List<String> buffer,Exchanger<List<String>> exchanger){
this.buffer=buffer;
this.exchanger=exchanger;
}
@Override
public void run() {
for(int i=1;i<5;i++){
try {
buffer=exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者消费次数:"+i);
for(int j=1;j<=3;j++) {
System.out.println("消费者消费" +buffer.get(0));
buffer.remove(0);
}
}
}
}
public static void main(String[] args) {
List<String> buffer1 = new ArrayList<String>();
List<String> buffer2 = new ArrayList<String>();
Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
Thread producerThread = new Thread(new Producer(buffer1,exchanger));
Thread consumerThread = new Thread(new Consumer(buffer2,exchanger));
producerThread.start();
consumerThread.start();
}
}
运行结果
生产者生产次数:1
生产者装入1--1
生产者装入1--2
生产者装入1--3
生产者装满,等待消费者交换
生产者生产次数:2
生产者装入2--1
生产者装入2--2
生产者装入2--3
生产者装满,等待消费者交换
消费者消费次数:1
消费者消费buffer: 1--1
消费者消费buffer: 1--2
消费者消费buffer: 1--3
消费者消费次数:2
消费者消费buffer: 2--1
消费者消费buffer: 2--2
生产者生产次数:3
生产者装入3--1
消费者消费buffer: 2--3
生产者装入3--2
生产者装入3--3
生产者装满,等待消费者交换
生产者生产次数:4
生产者装入4--1
生产者装入4--2
消费者消费次数:3
消费者消费buffer: 3--1
生产者装入4--3
生产者装满,等待消费者交换
消费者消费buffer: 3--2
消费者消费buffer: 3--3
消费者消费次数:4
消费者消费buffer: 4--1
消费者消费buffer: 4--2
消费者消费buffer: 4--3
在Exchanger中,如果一个线程已经到达了exchanger节点时,对于它的伙伴节点的情况有三种:
- 如果它的伙伴节点在该线程到达之前已经调用了exchanger方法,则它会唤醒它的伙伴然后进行数据交换,得到各自数据返回。
- 如果它的伙伴节点还没有到达交换点,则该线程将会被挂起,等待它的伙伴节点到达被唤醒,完成数据交换。
- 如果当前线程被中断了则抛出异常,或者等待超时了,则抛出超时异常。
参考资料
- Java并发编程的艺术
- Java并发编程实战
- cmsblogs.com/?p=2611