Java并发7:并发工具类

849 阅读7分钟

CountDownLatch(闭锁)

闭锁允许一个或者多个线程等待其他线程都完成了才继续执行。CountDownLatch 是一种闭锁的实现,使得一个或多个线程等待一组事情发生。通过计数器表示需要等待的事件数量;使用countDown()方法将计数器减去1,表示有一个事件发生;使用await()方法阻塞当前线程,等待计数器为0,也就是所有需要等待的事情发生。

CountDownLatch 不能重新初始化或者修改内部计数器的值。

CountDownLatch 内部依赖内部类 Sync 实现,而 Sync 类继承于 AQS。其内部通过共享锁实现。

示例:

public class CountDownLatchTest {
    private static CountDownLatch countDownLatch = new CountDownLatch(5);

    /**
     * Boss线程,等待员工到达开会
     */
    static class BossThread extends Thread{
        @Override
        public void run() {
            System.out.println("Boss在会议室等待,总共有" + countDownLatch.getCount() + "个人开会...");
            try {
                //Boss等待
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("所有人都已经到齐了,开会吧...");
        }
    }

    //员工到达会议室
    static class EmpleoyeeThread  extends Thread{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ",到达会议室....");
            //员工到达会议室 count - 1
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args){
        //Boss线程启动
        new BossThread().start();

        for(int i = 0 ; i < countDownLatch.getCount() ; i++){
            new EmpleoyeeThread().start();
        }
    }
}

运行结果:

Boss在会议室等待,总共有5个人开会...
Thread-2,到达会议室....
Thread-3,到达会议室....
Thread-1,到达会议室....
Thread-5,到达会议室....
Thread-4,到达会议室....
所有人都已经到齐了,开会吧...

CycliBarrier(屏障)

闭锁是一次性对象,一旦进入终止状态,就不能被重置。屏障类似于闭锁,阻塞一组进程直到某个事件发生。也就是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CycliBarrier 内部使用了可重用锁 ReentrantLock 和 Condition。线程执行await()方法后,计数器减1,进行等待,直到计数器为0,所有调用了await()方法的线程继续执行。

CycliBarrier适用于多个线程结果合并的场景。

示例:

public class CyclicBarrierTest {
    private static CyclicBarrier cyclicBarrier;

    static class CyclicBarrierThread extends Thread{
        public void run() {
            System.out.println(Thread.currentThread().getName() + "到了");
            //等待
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args){
        cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("人到齐了,开会吧....");
            }
        });

        for(int i = 0 ; i < 5 ; i++){
            new CyclicBarrierThread().start();
        }
    }
}

运行结果:

Thread-0到了
Thread-1到了
Thread-4到了
Thread-2到了
Thread-3到了
人到齐了,开会吧...

闭锁和屏障的比较

CountDownLatch 的计数器只能使用一次,CyclicBarrier 的计数器可以使用 reset() 方法重置。

闭锁只会阻塞一条线程,目的是为了让该条任务线程满足条件后执行; 而同步屏障会阻塞所有线程,目的是为了让所有线程同时执行。

Semaphore(信号量)

信号量用于控制同时访问某个特定资源的操作数量,或者执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

信号量可以用于实现资源池,也可以用于将容器变为有界阻塞容器。信号量管理着一组虚拟的许可,在执行操作时首先获取许可,并在使用以后释放许可。如果没有许可,将阻塞直到有许可或被中断,超时。

信号量的使用场景是,有m个资源,n个线程,且n>m,同一时刻只能允许m条线程访问资源。

信号量内部同样依赖于继承自 AQS 的 Sync 类,且包含两个子类,分别是 FairSync 和 NonfairSync。

信号量通过acquire()方法获取许可;通过release()方法释放许可。

示例:

public class SemaphoreCase {
    private static class Parking{
        private Semaphore semaphore;

        Parking(int count){
            semaphore=new Semaphore(count);
        }

        public void park(){
            try {
                semaphore.acquire();
                long time=(long) (Math.random()*10);
                System.out.println(Thread.currentThread().getName()+"进入停车场停车"+time+"秒");
                Thread.sleep(time);
                System.out.println(Thread.currentThread().getName()+"开出停车场");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
    }

    private static class Car extends Thread{
        Parking parking;
        Car(Parking parking){
            this.parking=parking;
        }

        @Override
        public void run() {
            parking.park();
        }
    }

    public static void main(String[] args) {
        Parking parking=new Parking(3);
        for(int i=0;i<5;i++){
            new Car(parking).start();
        }
    }
}

运行结果为:

Thread-2进入停车场停车7秒
Thread-1进入停车场停车9秒
Thread-0进入停车场停车4秒
Thread-0开出停车场
Thread-3进入停车场停车4秒
Thread-2开出停车场
Thread-4进入停车场停车3秒
Thread-1开出停车场
Thread-3开出停车场
Thread-4开出停车场

Exchanger(交换者)

Exchanger 是一个用于线程间协作的工具类,用于线程间的数据交换。它提供了一个同步点,在这个同步点,两个线程可以交换彼此的数据。两个线程通过exchange()方法交换数据,一个线程执行了该方法,会一直等待另一个线程执行该方法,当两个线程都到达同步点,这两个线程就可以交换数据。

示例:

public class ExchangerCase {
    private static class Producer implements Runnable{
        private List<String> buffer;
        private Exchanger<List<String>> exchanger;
        Producer(List<String> buffer,Exchanger<List<String>> exchanger){
            this.buffer=buffer;
            this.exchanger=exchanger;
        }
        @Override
        public void run() {
            for(int i=1;i<5;i++){
                System.out.println("生产者生产次数:"+i);
                for(int j=1;j<=3;j++){
                    System.out.println("生产者装入"+i+"--"+j);
                    buffer.add("buffer: "+i+"--"+j);
                }
                System.out.println("生产者装满,等待消费者交换");
                try {
                    exchanger.exchange(buffer);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    private static class Consumer implements Runnable{
        private List<String> buffer;
        private Exchanger<List<String>> exchanger;

        Consumer(List<String> buffer,Exchanger<List<String>> exchanger){
            this.buffer=buffer;
            this.exchanger=exchanger;
        }

        @Override
        public void run() {
            for(int i=1;i<5;i++){
                try {
                    buffer=exchanger.exchange(buffer);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者消费次数:"+i);
                for(int j=1;j<=3;j++) {
                    System.out.println("消费者消费" +buffer.get(0));
                    buffer.remove(0);
                }
            }
        }
    }

    public static void main(String[] args) {
        List<String> buffer1 = new ArrayList<String>();
        List<String> buffer2 = new ArrayList<String>();

        Exchanger<List<String>> exchanger = new Exchanger<List<String>>();

        Thread producerThread = new Thread(new Producer(buffer1,exchanger));
        Thread consumerThread = new Thread(new Consumer(buffer2,exchanger));

        producerThread.start();
        consumerThread.start();
    }
}

运行结果

生产者生产次数:1
生产者装入1--1
生产者装入1--2
生产者装入1--3
生产者装满,等待消费者交换
生产者生产次数:2
生产者装入2--1
生产者装入2--2
生产者装入2--3
生产者装满,等待消费者交换
消费者消费次数:1
消费者消费buffer: 1--1
消费者消费buffer: 1--2
消费者消费buffer: 1--3
消费者消费次数:2
消费者消费buffer: 2--1
消费者消费buffer: 2--2
生产者生产次数:3
生产者装入3--1
消费者消费buffer: 2--3
生产者装入3--2
生产者装入3--3
生产者装满,等待消费者交换
生产者生产次数:4
生产者装入4--1
生产者装入4--2
消费者消费次数:3
消费者消费buffer: 3--1
生产者装入4--3
生产者装满,等待消费者交换
消费者消费buffer: 3--2
消费者消费buffer: 3--3
消费者消费次数:4
消费者消费buffer: 4--1
消费者消费buffer: 4--2
消费者消费buffer: 4--3

在Exchanger中,如果一个线程已经到达了exchanger节点时,对于它的伙伴节点的情况有三种:

  • 如果它的伙伴节点在该线程到达之前已经调用了exchanger方法,则它会唤醒它的伙伴然后进行数据交换,得到各自数据返回。
  • 如果它的伙伴节点还没有到达交换点,则该线程将会被挂起,等待它的伙伴节点到达被唤醒,完成数据交换。
  • 如果当前线程被中断了则抛出异常,或者等待超时了,则抛出超时异常。

参考资料