Java JUC工具包之CyclicBarrier与CountDownLatch

477 阅读6分钟

假设有5个人约好一起去旅行,那么一般来说有2种组织出行方式,一种是自己组织自由行,另一种是跟团旅行。

在Java的JUC工具包中,有两个工具类可以类比这两种旅行方式,分别是CyclicBarrier和CountDownLatch。

两种旅行模式

1.自由旅行模式(CyclicBarrier)

旅行出发当天,5个人(按照约好的时间和地点)相互等待。先到的人,等待未到的人,一旦人齐了,集体成员就同时出发。

对应到CyclicBarrier类,就是:

有多个工作者线程,其中先准备好的线程,阻塞等待(await())未准备好的线程。一旦所有线程都准备好了,各个线程就同时执行任务。

关键知识点

  • 多个工作者线程;(其实1个也可以,但没意义)
  • 线程之间相互阻塞等待(await())彼此之间准备好
  • 所有线程马上同时执行

应用场景

CyclicBarrier一般应用于对公平性要求比较高,特别是需要同时执行的场景。

游戏类有很多适合的场景。例如剪子石头布比输赢,掷骰子比大小。

以掷骰子为例,先点击开始按钮一方的投掷线程,将会等待其他投掷线程。一旦所有游戏方都点击了开始按钮,所有投掷线程就同时运行并投掷出骰子。

2.跟团旅行模式(CountDownLatch)

旅行出发或结束当天,司机(按照约好的时间和地点)等待5名团员。每名团员到达后都进行签到(意味着未到人数减一),一旦人齐了,司机就立马开车。

对应到CountDownLatch类,就是:

有一到多个等待者线程,等待(await())一到多个被等待者线程做好准备或完成任务(countDown())。一旦所有被等待者线程都准备好或完成任务了,等待者线程就马上执行任务。

关键知识点

  • 一到多个等待者线程;
  • 一到多个被等待者线程;
  • 所有等待者线程,等待(await())所有被等待者线程准备好
  • 被等待者线程准备(countDown(),签到)完毕,所有等待者线程马上同时执行

应用场景

由于CountDownLatch多了一个countDown()方法,可以更灵活的使用。因此,其应用场景也相对广泛一些。

通常来说,CountDownLatch更适合用于等待一批线程准备好或完成指定任务,然后再进行下一步操作的场景。也就是说,等待者线程与被等待者线程之间存在先后依赖的执行顺序。

数据聚合。启动多个线程去抓取网页数据、查询接口数据或执行计算任务,另一个线程等待前者完成后,进行数据聚合相关操作。

投票选举主节点。候选节点启动多个线程,请求其他节点同意选自己为主节点。候选节点等待所有线程返回投票结果,如果收到半数以上同意选票则提升自己为主节点。

示例代码

CyclicBarrier

示例1

public class CyclicBarrierTest1 {

	private static int SIZE = 5;
	private static CyclicBarrier cb;

	public static void main(String[] args) {

		cb = new CyclicBarrier(SIZE);

		// 新建5个任务
		for (int i = 0; i < SIZE; i++)
			new InnerThread().start();
	}

	static class InnerThread extends Thread {
		public void run() {
			try {
				System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");

				Thread.sleep(2000);
				// 将cb的参与者数量加1
				cb.await();

				// cb的参与者数量等于5时,才继续往后执行
				System.out.println(Thread.currentThread().getName() + " continued.");
			} catch (BrokenBarrierException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

执行结果:
Thread-1 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-3 continued.
Thread-4 continued.
Thread-0 continued.
Thread-2 continued.
Thread-1 continued.

示例2

CyclicBarrier支持在所有线程都准备好之后,执行特定的操作(构造函数的第2个参数指定的操作)。

public class CyclicBarrierTest2 {

	private static int SIZE = 5;
	private static CyclicBarrier cb;

	public static void main(String[] args) {

		cb = new CyclicBarrier(SIZE, new Runnable() {
			public void run() {
				System.out.println("CyclicBarrier's parties is: " + cb.getParties());
			}
		});

		// 新建5个任务
		for (int i = 0; i < SIZE; i++)
			new InnerThread().start();
	}

	static class InnerThread extends Thread {
		public void run() {
			try {
				System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
				
				Thread.sleep(2000);
				// 将cb的参与者数量加1
				cb.await();

				// cb的参与者数量等于5时,才继续往后执行
				System.out.println(Thread.currentThread().getName() + " continued.");
			} catch (BrokenBarrierException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

执行结果:
Thread-2 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-1 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
CyclicBarrier's parties is: 5
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-0 continued.
Thread-1 continued.

CountDownLatch

示例1

public class CountDownLatchTest1 {

	private static int LATCH_SIZE = 5;
	private static CountDownLatch doneSignal;

	public static void main(String[] args) {

		try {
			doneSignal = new CountDownLatch(LATCH_SIZE);

			// 创建5个任务,即被等待线程
			for (int i = 0; i < LATCH_SIZE; i++)
				new InnerThread().start();

			new Thread(new Runnable() {
				public void run() {
					System.out.println("submain await begin.");
					try {
						// 子线程等待
						doneSignal.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println("submain await finished.");
				}
			}).start();

			// "主线程"等待线程池中5个任务的完成
			System.out.println("main await begin.");
			doneSignal.await();
			System.out.println("main await finished.");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 被等待线程
	 *
	 */
	static class InnerThread extends Thread {
		public void run() {
			try {
				Thread.sleep(1000);
				
				System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
				// 线程签到,计数器减1
				doneSignal.countDown();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

执行结果:
main await begin.
submain await begin.
Thread-2 sleep 1000ms.
Thread-0 sleep 1000ms.
Thread-3 sleep 1000ms.
Thread-1 sleep 1000ms.
Thread-4 sleep 1000ms.
main await finished.
submain await finished.

示例2

想像一种场景,当线程即是等待者线程,同时又是被等待者线程时,会是什么效果。答案是,可以实现CyclicBarrier的相互等待,线程同时执行的效果。

public class CountDownLatchTest3 {

	private static int SIZE = 5;
	private static CountDownLatch doneSignal;

	public static void main(String[] args) {

		doneSignal = new CountDownLatch(SIZE);

		// 新建5个任务
		for (int i = 0; i < SIZE; i++)
			new InnerThread().start();
	}

	static class InnerThread extends Thread {
		public void run() {
			try {
				System.out.println(Thread.currentThread().getName() + " wait for CountDownLatch.");

				Thread.sleep(2000);
				
				// 签到,计数器减1
				doneSignal.countDown();
				// 等待所有线程就绪
				doneSignal.await();

				// cb的参与者数量等于5时,才继续往后执行
				System.out.println(Thread.currentThread().getName() + " continued.");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

执行结果:
Thread-0 wait for CountDownLatch.
Thread-3 wait for CountDownLatch.
Thread-1 wait for CountDownLatch.
Thread-4 wait for CountDownLatch.
Thread-2 wait for CountDownLatch.
Thread-2 continued.
Thread-1 continued.
Thread-4 continued.
Thread-0 continued.
Thread-3 continued.

因此,如果要实现多个线程同时开始执行,可以使用CyclicBarrier或CountDownLatch来实现,前者显然更加直接明了。同时,还可以使用锁(synchronized、Lock)结合计数器进行实现。思路都是类似的:线程先相互等待,如果都就绪了,就开始执行。

最后,还要再说明一下,文章中提到的旅行模式,其实是我为了方便理解记忆而引入的一个概念,重在理解。


题图:timgsa.baidu.com

个人公众号

更多文章,请关注公众号:二进制之路

二进制之路