Java并发工具类(栅栏CyclicBarrier)

4,464 阅读2分钟

并发工具类系列:

Java并发工具类(闭锁CountDownLatch)

Java并发工具类(栅栏CyclicBarrier)

Java并发工具类(信号量Semaphore)

CyclicBarrier适用于这样的情况:你希望创建一组任务,它们并行地执行工作,然后在下一个步骤之前等待,直到所有任务都完成。栅栏和闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。

闭锁用于等待事件,而栅栏是线程之间彼此等待,等到都到的时候再决定做下一件事。可以参考Java并发工具类(闭锁CountDownLatch)

拿运动员的事情举例,运动员们跑到终点,互相等待所有人都到达终点后,再一起去做喝酒这件事。(运动员也许不能喝酒的,也许大家再跑一轮。)

下面用一个赛马程序来举例:

赛马

package concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

class Horse implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private int strides = 0;
    private static Random rand = new Random(47);
    private static CyclicBarrier barrier;
    public Horse(CyclicBarrier b) {barrier = b;}
    public synchronized int getStrides() {return strides;}
    public void run() {
        try {
            while (!Thread.interrupted()) {  //线程内不断循环
                synchronized (this) {
                    strides += rand.nextInt(3);   //每次马可以走0,1或者2步
                }
                barrier.await();  //走完后,就等所有其它马也走完,才能开始下一回合
            }
        } catch (InterruptedException e) {

        } catch (BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String toString() {
        return "Horse " + id + " ";
    }

    public String tracks() {
        StringBuilder s =new StringBuilder();
        for(int i = 0; i < getStrides();i++)
            s.append("*");   //这里打印每个马走的轨迹
        s.append(id);
        return s.toString();
    }
}
public class HorseRace {
    static final int FINISH_LINE = 75;
    private List<Horse> horses = new ArrayList<Horse>();
    private ExecutorService exec = Executors.newCachedThreadPool();
    private CyclicBarrier barrier;
    public HorseRace(int nHorses, final int pause) {
        barrier = new CyclicBarrier(nHorses, new Runnable() {
            @Override
            public void run() {
                StringBuilder s = new StringBuilder();
                for (int i = 0; i < FINISH_LINE; i++) {
                    s.append("="); //打印赛道
                }
                System.out.println(s);
                for (Horse horse : horses) {
                    System.out.println(horse.tracks());  //打印每匹马的轨迹
                }
                for (Horse horse : horses) {
                  if (horse.getStrides() >= FINISH_LINE) {
                      System.out.println(horse + "won!");   //每次检查,如果哪匹马到终点了,终止所有线程
                      exec.shutdownNow();
                      return;
                  }
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(pause); //每走完一轮,暂停一小会输出
                } catch (InterruptedException e) {
                    System.out.println("barrier-action sleep interrupted");
                }
            }
        });

        for (int i = 0; i < nHorses; i++) {
            Horse horse = new Horse(barrier);
            horses.add(horse);
            exec.execute(horse);  //所有马的线程开始执行
        }
    }

    public static void main(String[] args) {
        int nHorses = 7;
        int pause = 200;
        new HorseRace(nHorses, pause);
    }
}

我们假设赛道长为75,马每次能走0,1或者2步,每次走完一轮后,互相等待。一旦所有马越过栅栏,它就会自动为下一回合的比赛做好准备。读者可以运行我的程序,在控制台上可以展示出一定的动画效果。

上面的例子中,我们向CyclicBarrier提供一个“栅栏动作”,它是一个Runnable,当计数值到达0时自动执行,这是CyclicBarrier和CountDownLatch之间的另一个区别。

public CyclicBarrier(int parties, Runnable barrierAction)

除此之外,CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。