阅读 124

并发艺术(三)预热篇,来聊聊JUC下的并发工具包

前言

我们经常会听到JUC,它其实是并发大师Doug Lea在jdk写的java.util.concurrent.*。其实我们在前面已经讲了AtomicLock,友链:并发艺术(二)一玩到底,玩透JAVA各种锁机制。这些算是开胃菜吧,在探讨并发核心AQS前,这篇我们先来热身一下,玩玩JUC下的并发工具。

Doug Lea大师为我们提供了一些在不使用(无感知)synchronized、lock的特殊场景下的一些并发工具,希望看完这些,能为小伙伴们在以后的工作中有点点帮助。

CountDownLatch

CountDown,倒数,Latch,锁。如果觉得不好理解,我们来看下构造函数:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
复制代码

这里有个sync,我们看下:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}
复制代码

可以看到,sync继承AbstractQueuedSynchronizer,好吧,这个有点东西,我们后面章节探讨

CountDownLatch构造参数count,

count the number of times {@link #countDown} must be invoked before threads can pass through {@link #await}

Doug Lea大师给出了说明,线程通过await()前,必须调用countDown()的次数。

由此,我们可以大致理解下CountDownLatch的作用,来张图吧

我们初始化一个CountDownLatch,count为4,左边的引用CountDownLatch实例的四个countDown()的地方,右边是一个await(),这些可以在一个线程也可以在多个线程,因为我们操作的是CountDownLatch实例对象。

4次countDown()之后,await()的地方就会放行,执行后续代码,未达到的话,就阻塞

CountDownLatch的一些主要方法:

await()

// 阻塞,直到countDown次数与初始化count一直
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
复制代码

await(long timeout, TimeUnit unit)

// 阻塞一段时间,时间到继续执行后续代码
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
复制代码

countDown()

public void countDown() {
    // release 1
    sync.releaseShared(1);
}
复制代码

下面来一段demo

public class CountDownLatchDemo {

    static CountDownLatch countDownLatch = new CountDownLatch(4);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 4; i++) {
            new Thread(() -> {
                countDownLatch.countDown();
                System.out.println("线程ID:" + Thread.currentThread().getId() + ", countDown");
            }).start();
        }
        
        countDownLatch.await();
        System.out.println("pass...");
    }
}

===============结果====================
线程ID:11, countDown
线程ID:12, countDown
线程ID:13, countDown
线程ID:14, countDown
pass...
复制代码

只有countDown次数==count,才会执行await()后续代码,否则阻塞,下面我们来个超时的:

public class CountDownLatchDemo {

    static CountDownLatch countDownLatch = new CountDownLatch(4);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 4; i++) {
            new Thread(() -> {
                try {
                    // 睡眠3s
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
                System.out.println("线程ID:" + Thread.currentThread().getId() + ", countDown");
            }).start();
        }

        // 等待2s
        countDownLatch.await(2000, TimeUnit.MILLISECONDS);
        System.out.println("pass...");
    }
}

============结果================
pass...
线程ID:11, countDown
线程ID:12, countDown
线程ID:13, countDown
线程ID:14, countDown
复制代码

由于等待2s,而线程countDown前睡眠了3s,所有超时后,await()不再阻塞,继续执行。

CyclicBarrier

CyclicBarrier与CountDownLatch有点类似

我们看下构造函数

/**
 * Creates a new {@code CyclicBarrier} that will trip when the
 * given number of parties (threads) are waiting upon it, and which
 * will execute the given barrier action when the barrier is tripped,
 * performed by the last thread entering the barrier.
 *
 * @param parties the number of threads that must invoke {@link #await}
 *        before the barrier is tripped
 * @param barrierAction the command to execute when the barrier is
 *        tripped, or {@code null} if there is no action
 * @throws IllegalArgumentException if {@code parties} is less than 1
 */
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
复制代码

看下Doug Lea的说明,parties,指障碍消除之前await()调用的次数。

同样四个线程,都可以访问CyclicBrrier的同一个实例对象,当每个线程都执行到了await()时,才会消除屏障,放行,除了可以继续执行await()后续的代码,还可以调用一个Runnable,构造参数中我们可以定义。

执行await()的时候,其实也维护了一个count,跟CountDownLatch原理差不多,这里就不看源码了,大家可以自己看下。

这里我们来模拟四个线程及一个barrierAction,其中两个线程率先到达await()

public class CyclicBarrierDemo {

    static CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new BarrierAction());

    // 屏障消除会调用此线程
    static class BarrierAction implements Runnable {
        @Override
        public void run() {
            System.out.println("barrierAction......");
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                try {
                    System.out.println("Thread id :" + Thread.currentThread().getId() + "await start...");
                    cyclicBarrier.await();
                    System.out.println("Thread id :" + Thread.currentThread().getId() + "await end...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                try {
                    System.out.println("Thread id :" + Thread.currentThread().getId() + "await start...");
                    // 睡眠1s
                    Thread.sleep(1000);
                    cyclicBarrier.await();
                    System.out.println("Thread id :" + Thread.currentThread().getId() + "await end...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

=================结果=======================
Thread id :11await start...
Thread id :12await start...
Thread id :13await start...
Thread id :14await start...
barrierAction......
Thread id :14await end...
Thread id :13await end...
Thread id :12await end...
Thread id :11await end...
复制代码

可以看到,虽然其中两个线程睡眠了1s,但是结果还是等所有线程都执行完,才做后续执行,并且调用了我们定义的BarrierAction

CyclicBarrier通过对await()方法提供了超时机制,与CountDownLatch类似,可自行尝试。

Exchanger

Exchanger稍稍有些不同,它主要用于两个线程的数据交换,使用场景比较局限。

我们来操作一波

public class ExchangerDemo {

    static Exchanger exchanger = new Exchanger();

    static class ExchangeThread extends Thread {

        private int n;

        ExchangeThread(int n) {
            this.n = n;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread() + " 原值 " + n);
                this.n = (int) exchanger.exchange(n);
                System.out.println(Thread.currentThread() + " 交换后值 " + n);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ExchangeThread thread1 = new ExchangeThread(5);
        ExchangeThread thread2 = new ExchangeThread(10);

        thread1.start();
        thread2.start();
    }
}

================结果=====================
Thread[Thread-0,5,main] 原值 5
Thread[Thread-1,5,main] 原值 10
Thread[Thread-1,5,main] 交换后值 5
Thread[Thread-0,5,main] 交换后值 10
复制代码

可以看到Thread-0原值5,交换后变成10。Thread-1原值10,交换后变成5。

同样Exchanger也提供超时处理机制,大家可以自己尝试一下。

Semaphore

Semaphore意为信号量,所谓的量是一个数目,而信号则代表个体的状态,个人理解。有点类似连接池,假设总共10个连接,不可扩,用的时候申请一个,如果有,有获取连接,没有就等待,用完之后将连接池放回去。

构造函数

/**
 * Creates a {@code Semaphore} with the given number of
 * permits and the given fairness setting.
 *
 * @param permits the initial number of permits available.
 *        This value may be negative, in which case releases
 *        must occur before any acquires will be granted.
 * @param fair {@code true} if this semaphore will guarantee
 *        first-in first-out granting of permits under contention,
 *        else {@code false}
 */
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
复制代码

我们看到,里面也用到了sync,这个后面介绍。permits为许可数量,fair代表是公平还是非公平,这个前面章节有介绍公平锁和非公平锁。

Semaphore主要有方法

// 获取许可
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
复制代码
// 释放许可
public void release() {
    sync.releaseShared(1);
}
复制代码

我们模拟一个简单的限流,有n个人,只有拿到令牌的人才能doSomething,其他没有令牌的得等着。

public static void main(String[] args) {
    Token token = new Token();
    // 初始化1个令牌
    token.init(1);

    // 初始化3个线程,假定代表三个人
    for (int i = 0; i < 3; i++) {
        new Thread(() -> {
            try {
                token.acquire();
                System.out.println("线程ID: " + Thread.currentThread().getId() + "获取令牌");
                
                // doSomething
                Thread.sleep(1000);

                System.out.println("线程ID: " + Thread.currentThread().getId() + "放回令牌");
                token.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}

static class Token {

    private Semaphore semaphore;

    public void init(int n) {
        semaphore = new Semaphore(n);
    }

    public void acquire() throws InterruptedException {
        semaphore.acquire();
    }

    public void release() throws InterruptedException {
        semaphore.release();
    }
}

====================结果===========================
线程ID: 11获取令牌
线程ID: 11放回令牌
线程ID: 12获取令牌
线程ID: 12放回令牌
线程ID: 13获取令牌
线程ID: 13放回令牌
复制代码

我们可以看到同时只有一个人在doSomething。

Semaphore也提供一些超时机制,可以自行尝试。

方法 备注
tryAcquire() 尝试获取,成功返回true,失败返回false
tryAcquire(long timeout, TimeUnit unit) 尝试一定时间

总结

今天主要介绍了JUC下得一些并发工具,每个工具都有不同的使用场景:

工具 线程安全 场景
CountDownLatch AQS 适用于n个线程操作是某个线程操作的前置
CyclicBarrier Lock 适用于一组线程达到某个屏障后,继续向后执行或调用其他线程
Exchanger ThreadLocal、CAS 适用于两个线程达到某个点后进行数据交换,场景比较局限
Semaphore AQS 适用于控制多个线程能同时访问一个资源的个数

可以根据平时的业务场景不同,来选择合适的并发工具。

下章将介绍并发核心AQS和Condition

关注下面的标签,发现更多相似文章
评论