阅读 106

【并发编程】J.U.C 之 AQS 介绍、实现及其子类使用演示

AQS 介绍

AQS全称AbstractQueuedSynchronizer.

AQS底层的数据结构是双向链表,是队列的一种实现,因此也可以把它当作一个队列。

其中Sync queue是同步队列,其head节点主要用于后期调度。这里的head节点就是占用资源的线程,后面的都是等待资源的线程。

下面的Condition queue 是一个单项链表,它不是必须的,只有当程序中需要使用到condition的时候才会存在,并且可能会有多个condition queue

AQS的设计思想:

  • 1.使用Node实现FIFO队列,可用于构建锁或者其他同步装置的基础框架。
  • 2.利用了一个int类型表示状态。(state)
  • 3.使用方法是继承。
  • 4.子类通过继承并通过实现它的方法管理状态(acquirerelease)的方法操纵状态。
  • 5.可以同时实现排他锁和共享锁模式。

具体实现的大致思路:

AQS内部维护了一个CLH队列来管理锁,线程会首先尝试获取锁,如果失败就将当前线程以及等待状态等信息包成一个Node节点,加入到同步队列Sync queue中,接着会不断循环尝试获取锁,条件是当前节点为head的直接后继才会尝试,如果失败就会阻塞自己,直到自己被唤醒。当持有锁的线程释放锁的时候会唤醒队列中的后继线程。

AQS同步组件

基于这些设计思路JDK提供了很多基于AQS的子类。

  • CountDownLatch:闭锁,通过计数来保证线程是否需要一直阻塞。
  • Semaphore:能控制同一时间并发线程的数目。
  • CyclicBarrier
  • ReentrantLock
  • Condition
  • FutureTask

CountDownLatch

CountDownLatch是一个同步辅助类,通过它可以完成类似于阻塞当前线程的功能,也就是一个或多个线程一直等待直到其他线程执行完成。CountDownLatch用了一个给定的计数器来进行初始化,该计数器的操作是原子操作,即同时只能有一个线程操作该计数器,调用该类await方法的线程会一直处于阻塞状态,直到其他线程调用countDown方法时计数器的值变成0,每次调用countDown时计数器的值会减1,当计数器的值为0时所有因await方法而处于等待状态的线程就会继续执行。这种操作至多出现一次,因为这里的计数器是不能被重置的。

使用场景

并行计算,处理量很大时可以将运算任务拆分成多个子任务,当所有子任务都完成之后,父任务再将所有子任务都结果进行汇总。

Coding演示

@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++){
            final int thradNum = i;
            executorService.execute(() -> {
                try {
                    test(thradNum);
                } catch (Exception e){
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        executorService.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
复制代码

运行结果

可以看到finish是在所有方法都执行完后才执行的,也就是计数器减到0了才能执行。

Semaphore

类似于操作系统里的信号量,可以控制某个资源可被同时访问的线程数。

Coding演示

@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++){
            final int thradNum = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire(); // 获取一个许可
                    test(thradNum);
                    semaphore.release(); // 释放一个许可
                } catch (Exception e){
                    log.error("exception", e);
                }
            });
        }
        executorService.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
复制代码

运行结果是以每次3个输出的,总共输出20个。

实际开发中有时需要拿到多个许可才允许执行,下面来演示一下需要获取多个许可的情况。

semaphore.acquire(3); // 获取3个许可
test(thradNum);
semaphore.release(3); // 释放3个许可
复制代码

这样运行以后就是每次只输出1条结果,因为Semaphore总共只有3个许可,而这里一次操作需要获取3个许可。

当线程数过多时我们可以使用尝试获取许可的方法

if (semaphore.tryAcquire()){
    test(thradNum);
    semaphore.release(); // 释放一个许可
}
复制代码

这种情况下如果获取不到许可就会被直接丢弃。

运行结果只输出了3条结果。

我们来看一下tryAcquire方法的实现

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}
复制代码

可以看到传入的参数为申请的许可数,也就是获取多个许可。

public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
复制代码

这里的参数为超时时间和时间单位,意味着在尝试获取许可时可以等待一段时间。

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
复制代码

这里就是结合上面两种来使用的情况。

CyclicBarrier

CyclicBarrier也是一个同步辅助类,它允许一组线程相互等待直到到达某个公共的屏障点,通过它可以完成多个线程之间相互等待时,只有当每个线程都准备就绪后才能各自继续执行后面的操作。它也是通过计数器来实现,当某个线程调用await方法后就进入等待状态,计数器执行加一操作。当计数器的值达到了设置的初始值时等待状态的线程会被唤醒继续执行。由于CyclicBarrier在释放等待线程后可以重用,所以可以称之为循环屏障。

注意: CyclicBarrier的计数器可以重置。

Coding演示

@Slf4j
public class CyclicBarrierExample1 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}
复制代码

运行结果

ready状态时日志是每秒输出一条,当有5条ready时会一次性输出5条continue。这就是前面讲的全部线程准备就绪后同时开始执行。

在初始化CyclicBarrier时还可以在等待线程数后指定一个runnable,含义是当线程到达这个屏障时优先执行这里的runnable

private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
    log.info("call back is ready.");
});
复制代码

运行结果

ReentrantLock

Java中的锁主要分成两类:

  • synchronized修饰的锁
  • J.U.C 里提供的锁

ReentrantLock属于第二类,下面通过与第一类锁的对比来学习这个锁

ReentrantLock(可重入锁)和synchronized 区别

  • 1.锁的实现:synchronized是依赖于jvm实现的,ReentrantLock是依赖jdk实现的。
  • 2.性能的区别:在sync优化以后,两者性能差不多,都可用时更建议使用sync,因为它的写法更容易。
  • 3.功能的区别:锁的灵活性ReentrantLock优于sync。
  • 4.ReentrantLock可以指定是公平锁还是非公平锁,而sync只能是非公平锁。
  • 5.ReentrantLock提供了一个Condition类,可以分组唤醒需要的线程。sync只能随机唤醒一个线程或者唤醒全部线程。
  • 6.ReentrantLock提供了能够中断等待锁的线程的机制。

ReentrantLock的实现是一种自旋锁,通过循环调用CAS操作来实现加锁,它性能比较好的原因之一就是避免了线程进入内核态的阻塞状态。

Coding演示

@Slf4j
public class LockExample2 {

    /**
     * 请求总数
     */
    public static int clientTotal = 5000;
    /**
     * 同时并发执行线程数
     */
    public static int threadTotal = 200;

    public static int count = 0;

    private static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add(){
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }

    }
}
复制代码

执行结果始终是5000。

我们点进ReentrantLock来看一下它做了什么事情

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}
复制代码

这里默认给了一个不公平的锁。

也可以通过传入true或者false来选择公平锁还是不公平锁

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
复制代码

下面来看tryLock方法

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
复制代码

tryLock的含义是仅在调用时锁定未被另一个线程保持的情况下才获取锁定。第二个传入时间参数的含义是如果锁定在给定的等待时间内没有被另一个线程保持且当前线程没有被中断,则获取这个锁定。使用这两个方法时一定要理解清楚这两个方法的含义。

ReentrantReadWriteLock

下面来介绍JUClocks包内另外一个锁ReentrantReadWriteLock

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** Performs all synchronization mechanics */
    final Sync sync;
}
复制代码

可以看到它里面有两个锁:读锁和写锁。

它是在没有任何读写锁的情况下才能取得写入的锁。

它可以用于实现了悲观读取,即当执行中进行读取时,可能有另一个进程要写入的需求,为了保持同步就需要ReentrantReadWriteLock的读取锁定。但是如果读取很多,写入很少的情况下,使用ReentrantReadWriteLock可能会使写入线程遭遇饥饿,即写入线程长期处于等待状态。

Coding演示

@Slf4j
public class LockExample3 {

    private final Map<String, Data> map = new TreeMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys() {
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

}
复制代码

这里实现了对Map的读写操作的同步,通过对内部方法的封装使外部在调用方法时不需要考虑同步问题。

需要注意的是ReentrantReadWriteLock实现的是悲观读取,如果想获得写入锁时坚决不允许有任何读锁还保持着,即当所有读操作做完时才允许写操作,因此可能会造成写操作的线程饥饿。

StampedLock

StampedLock控制锁有三种方式,分别是:写,读,乐观读。

StampedLock的状态由版本和模式两个部分组成。锁获取方法返回的是一个数组作为票据(Stamp),用相应的锁状态来表示和控制相关的访问,输出0表示没有写锁被授权访问,在读锁上分为悲观锁和乐观锁。

Coding演示

@Slf4j
public class LockExample5 {

    /**
     * 请求总数
     */
    public static int clientTotal = 5000;
    /**
     * 同时并发执行线程数
     */
    public static int threadTotal = 200;

    public static int count = 0;

    private static StampedLock lock = new StampedLock();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add(){
        long stamp = lock.writeLock();
        try {
            count++;
        } finally {
            lock.unlock(stamp);
        }

    }
}
复制代码

运行结果始终为5000,线程安全。

StampLock对吞吐量有巨大改进,特别是在读线程越来越多的场景下。

注意:除了sync,其他锁都要在使用完后释放锁。

总结

介绍了这么多种锁,我们来总结一下各个锁使用的场景

  • 当只有少量竞争者时sync是很好的选择。
  • 竞争者不少,但线程增长的趋势能够预估时,ReentrantLock适合。

sync由于是jvm自动解锁,所以肯定不会造成死锁,而其他锁可能因为使用不当造成死锁。

Written by Autu

2019.7.20

关注下面的标签,发现更多相似文章
评论