一、简介
CyclicBarrier是并发包中提供的一个同步辅助类,可以使一定数量的线程全部在栅栏位置处汇集,parties的线程才能继续往下执行。当线程到达栅栏位置时调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。CyclicBarrier内部是基于ReentrantLock和Condition(AQS内部类)进行实现,维护一个parties的线程数。和CountDownLatch的区别是1、CyclicBarrier可以被重用。2、CountDownLatch是通过AQS的共享模式进行实现,CyclicBarrier是基于ReentrantLock和Condition,以及内部维护一个parties的线程数进行实现,对ReentrantLock不清楚的,可以看下我的另一篇对ReentrantLock的源码分析juejin.cn/post/684490…。基于Java8。
二、属性
//独占锁,属性generation、 count不是线程安全需要在独占锁加锁操作的前提下,对ReentrantLock不清楚的可以看下我的另一篇https://juejin.cn/post/6844903878882754567
private final ReentrantLock lock = new ReentrantLock();
//独占锁的条件变量,等待所有线程都到达栅栏位置的所有线程,都是在条件队列中等待所有线程都到达栅栏位置
private final Condition trip = lock.newCondition();
//还未到达栅栏位置的起始线程数量,主要是CyclicBarrier重复使用的时候,重新赋值给count
private final int parties;
//最后一个到达栅栏的线程需要执行的任务
private final Runnable barrierCommand;
//CyclicBarrier栅栏的代数,有一代被打破,CyclicBarrier就不能再使用了,即等待在栅栏的其中一个线程在等待的过程中,此属性不是线程安全的
private Generation generation = new Generation();
//还未到达栅栏位置的线程数量,有一个线程到达栅栏count就做减1操作,此属性的操作需要在ReentrantLock独占锁的条件下
private int count;
三、内部类
//栅栏的代数,有一代被打破,CyclicBarrier就不能再使用了,即等待在栅栏的其中一个线程在等待的过程中
private static class Generation {
//如果栅栏被打破,broken就会置为true
boolean broken = false;
}
四、构造函数
/**
* 传入parties和barrierAction构造CyclicBarrier实例
*
* @param parties 还未到达栅栏位置的起始线程数量,主要是CyclicBarrier重复使用的时候,重新赋值给count
* @param barrierAction 最后一个到达栅栏的线程需要执行的任务
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
//如果传入进来的parties小于等于0,抛出IllegalArgumentException异常
if (parties <= 0) throw new IllegalArgumentException();
//将传入进来的parties赋值给属性parties
this.parties = parties;
//将传入进来的parties赋值给属性count
this.count = parties;
//将传入进来的barrierAction任务赋值给属性barrierAction
this.barrierCommand = barrierAction;
}
/**
* 只传入parties构造CyclicBarrier实例
*
* @param parties 还未到达栅栏位置的起始线程数量,主要是CyclicBarrier重复使用的时候,重新赋值给count
*/
public CyclicBarrier(int parties) {
//调用上面介绍的CyclicBarrier构造函数,barrierAction任务传入空
this(parties, null);
}
五、等待所有线程到达
/**
* 返回当前线程第几个到达栅栏,当前线程等待其他所有线程到达栅栏,否则会阻塞到其他线程都到达栅栏被唤醒
*
* @return 当前线程到达栅栏的指数,即第几个到达栅栏
* @throws InterruptedException 如果当前线程在等待其他所有的线程也到达栅栏时被中断,会抛出中断异常,此时栅栏已经不能被使用了,Generation已经被打破,broken已经为true,再次使用栅栏会抛出BrokenBarrierException异常
* @throws BrokenBarrierException 如果CyclicBarrier栅栏的Generation已经被打破,即Generation的属性broken为true,当前线程再次使用会抛出BrokenBarrierException异常
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
//等待其他所有线程都到达栅栏,会一直阻塞到所有线程到达栅栏被唤醒,或者其中一个线程被中断破坏了栅栏,也会被唤醒,不支持超时的调用下面介绍的dowait方法
return dowait(false, 0L);
} catch (TimeoutException toe) {//由于是一直阻塞到被唤醒,为此不会抛出超时异常,由于TimeoutException是编译异常,为此需要对其进行捕获
//不会发生,如果发生直接将超时异常封装成Error
throw new Error(toe);
}
}
/**
* 返回当前线程第几个到达栅栏,当前线程超时的等待其他所有线程到达栅栏,否则会超时的等待到其他线程都到达栅栏被唤醒,如果超时其他线程还都没有全部到达栅栏,会抛出TimeoutException异常
*
* @return 当前线程到达栅栏的指数,即第几个到达栅栏
* @throws InterruptedException 如果当前线程在等待其他所有的线程也到达栅栏时被中断,会抛出中断异常,此时栅栏已经不能被使用了,Generation已经被打破,broken已经为true,再次使用栅栏会抛出BrokenBarrierException异常
* @throws BrokenBarrierException 如果CyclicBarrier栅栏的Generation已经被打破,即Generation的属性broken为true,当前线程再次使用会抛出BrokenBarrierException异常
* @throws TimeoutException 如果超时其他线程还都没有全部到达栅栏,会抛出TimeoutException异常
*/
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
//超时的调用下面介绍的dowait方法
return dowait(true, unit.toNanos(timeout));
}
/**
* 返回当前线程第几个到达栅栏,当前线程等待其他所有线程到达栅栏,否则会阻塞到其他线程都到达栅栏被唤醒,支持当前线程超时的等待和非超时的等待
*
* @param timed 是否超时等待其他线程到达栅栏
* @param nanos 超时的时间参数
* @return 当前线程到达栅栏的指数,即第几个到达栅栏
* @throws InterruptedException 如果当前线程在等待其他所有的线程也到达栅栏时被中断,会抛出中断异常,此时栅栏已经不能被使用了,Generation已经被打破,broken已经为true,再次使用栅栏会抛出BrokenBarrierException异常
* @throws BrokenBarrierException 如果CyclicBarrier栅栏的Generation已经被打破,即Generation的属性broken为true,当前线程再次使用会抛出BrokenBarrierException异常
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//获取独占锁,由于generation、count属性都不是线程安全的,为此需要在独占锁下才能操作
final ReentrantLock lock = this.lock;
//加独占锁
lock.lock();
try {
//获取CyclicBarrier栅栏的代信息属性generation
final Generation g = generation;
//判断栅栏是否被打破,即generation的broken属性,如果CyclicBarrier栅栏被打破,broken属性为true,CyclicBarrier栅栏不能再被使用
if (g.broken)
//如果CyclicBarrier栅栏被打破,抛出BrokenBarrierException异常
throw new BrokenBarrierException();
//判断当前线程是否被中断,如果被中断,调用下面介绍的breakBarrier方法
if (Thread.interrupted()) {
//调用下面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count重新赋值为parties属性值,唤醒所有等待其他线程都到达栅栏的线程
breakBarrier();
//抛出InterruptedException异常
throw new InterruptedException();
}
//将count先做减一操作,再赋值给index,表示当前线程第几个到达栅栏,在当前线程被唤醒时,做为返回值返回
int index = --count;
//如果当前线程是最后一个到达CyclicBarrier栅栏的线程
if (index == 0) {
//最后一个到达栅栏的线程执行任务是否成功标识,再finally中需要做对应的处理,唤醒其他对待的线程,有可能传入进来的任务执行抛出异常
boolean ranAction = false;
try {
//获取到需要最后一个到达栅栏的线程执行的任务,可能为空
final Runnable command = barrierCommand;
//如果任务不为空
if (command != null)
//当前最后一个到达CyclicBarrier栅栏的线程执行任务,调用任务的run方法
command.run();
//如果任务不为空,执行成功,ranAction标识为置为true
ranAction = true;
//调用下面介绍的nextGeneration方法,唤醒所有等待其他线程都到达栅栏的线程
nextGeneration();
//返回0,因为是最后一个到达栅栏的线程
return 0;
} finally {
//如果最后一个到达CyclicBarrier栅栏的线程执行不为空的任务失败
if (!ranAction)
//调用下面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count重新赋值为parties属性值,唤醒所有等待其他线程都到达栅栏的线程
breakBarrier();
}
}
//循环直到其他线程都到达栅栏,或者栅栏不可用,或者等待的线程被中断,或者等待其他线程都到达栅栏超时
for (;;) {
try {
//如果不支持超时
if (!timed)
//调用Condition的await方法,Condition不清楚的可以看我的另一篇AQS源码对Condition的分析https://juejin.cn/post/6844903872939425805
trip.await();
//如果支持超时,并且超时时间nanos小于0,调用await支持超时会抛出超时异常
else if (nanos > 0L)
//如果支持超时timed为true,并且nanos大于0,调用Condition的await的超时方法
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {//如果线程在等待其他所有线程到大栅栏的过程中,被中断
//如果栅栏的代信息对象没有改变,并且代信息对象的broken属性为false,栅栏没有被打破,为什么代信息对象会改变,因为调用下面介绍的reset的方法,会改变代信息
if (g == generation && ! g.broken) {
//调用下面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count重新赋值为parties属性值,唤醒所有等待其他线程都到达栅栏的线程
breakBarrier();
//抛出线程被中断异常
throw ie;
} else {
//如果上面的栅栏的代信息已经改变,或者栅栏已经被打破,即generation的属性broken为true,重置线程的中断标志位,因为异常线程的中断标志位会被重置
Thread.currentThread().interrupt();
}
}
//如果CyclicBarrier栅栏已经被打破,即generation的属性broken为true
if (g.broken)
//抛出BrokenBarrierException异常
throw new BrokenBarrierException();
//如果栅栏的代信息已经改变,主动调用reset方法,会改变栅栏的代信息
if (g != generation)
//返回线程第几个到达栅栏
return index;
//如果超时的调用await方法,即timed为true,并且nanos小于等于0
if (timed && nanos <= 0L) {
//调用下面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count重新赋值为parties属性值,唤醒所有等待其他线程都到达栅栏的线程
breakBarrier();
//抛出超时异常
throw new TimeoutException();
}
}
} finally {
//释放独占锁
lock.unlock();
}
}
//重置CyclicBarrier栅栏的代信息,唤醒所有等待其他线程都到达栅栏的线程,以及将count重置为parties
private void nextGeneration() {
//唤醒所有等待其他线程都到达栅栏的线程
trip.signalAll();
//将count重置为parties
count = parties;
//重置CyclicBarrier栅栏的代信息
generation = new Generation();
}
//将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count重新赋值为parties属性值,唤醒所有等待其他线程都到达栅栏的线程
private void breakBarrier() {
//将generation的broken属性置为true
generation.broken = true;
//将count重置为parties
count = parties;
//唤醒所有等待其他线程都到达栅栏的线程
trip.signalAll();
}
六、其他方法
//判断CyclicBarrier栅栏是否被打破,即CyclicBarrier栅栏的代信息generation的属性broken是否为true
public boolean isBroken() {
//获取独占锁,由于generation属性不是线程安全的,为此需要在独占锁下才能操作
final ReentrantLock lock = this.lock;
//加独占锁
lock.lock();
try {
//返回CyclicBarrier栅栏的代信息generation的属性broken,如果为true表明栅栏被打破
return generation.broken;
} finally {
//释放独占锁
lock.unlock();
}
}
//重置CyclicBarrier栅栏
public void reset() {
//获取独占锁,由于generation、count属性都不是线程安全的,为此需要在独占锁下才能操作
final ReentrantLock lock = this.lock;
//加独占锁
lock.lock();
try {
//调用上面介绍的breakBarrier方法,将generation的broken属性置为true,表示CyclicBarrier栅栏被打破不能再被使用,将属性count重新赋值为parties属性值,唤醒所有等待其他线程都到达栅栏的线程
breakBarrier(); // break the current generation
//重置CyclicBarrier栅栏的代信息,唤醒所有等待其他线程都到达栅栏的线程,以及将count重置为parties
nextGeneration(); // start a new generation
} finally {
//释放独占锁
lock.unlock();
}
}
//获取有多少线程在等待其他线程到达栅栏
public int getNumberWaiting() {
//获取独占锁,由于count属性都不是线程安全的,为此需要在独占锁下才能操作
final ReentrantLock lock = this.lock;
//加独占锁
lock.lock();
try {
//使用还未到达栅栏位置的起始线程数量parties减去目前还未到达栅栏位置的线程数量,获取到有多少线程在等待其他线程到达栅栏
return parties - count;
} finally {
//释放独占锁
lock.unlock();
}
}