AbstractQueuedSynchronizer整体解析

930 阅读5分钟

AbstractQueuedSynchronizer整体解析

前言

在此之前,我们深入源码分析过ReentrantLock系列,在那里就探讨过AbstractQueuedSynchronizer(下称AQS)类,称其是同步组件乃至整个并发包的基础类。这篇文章就深入AQS,从AQS的角度了解同步器以及ReentrantLock、ReentrantReadWriteLock等的实现机制,实现自定义的同步组件,以窥探整个同步框架的全貌。

AQS及同步器整体介绍

有关类字段及方法的介绍,在ReentrantLock原理探究(一)就已说过,今天我们换种方式,就围绕两个问题介绍AQS:

  1. AQS是干什么的?
  2. 它是怎样做到的?

回答出这两个问题,我们就从整体性上理解了AQS,乃至整个同步器组件。

AQS类源码注释说得很多,重点有,该类是一个用于构建锁或其他同步器的基础框架,使用一个int的成员变量表示同步状态。另外,还有一个内置的先进先出的队列可储存竞争同步状态时排队的线程。

从洋洋洒洒的类注释及其他资料,我们不难还原出AQS要做的事:有一个共享资源state(int类型的变量),各个线程去竞争这个资源,竞争到的拥有资源,去处理自己的逻辑;没竞争到去排队(进入先进先出队列),等拥有资源的线程释放共享资源后,队列中线程的再去竞争。

AQS基本实现了以上功能,相当于搭好了整体框架,我们需要实现哪个具体的功能,重写AQS某些指定方法即可。下面是两个同步器类实现的大体思路

  1. ReentrantLock,是排他锁,某个线程获取锁后其他线程就会阻塞直至锁的释放。共享资源state初始值为0,表示资源未被占有。某线程访问并设置state为1,表示该线程占有了锁。当其他线程读取到state不为0后进入队列等待,直到占有锁的线程将其设为0后,队列线程才会得到通知,重新竞争锁。(事实上ReentrantLock作为可重入锁,占有锁的线程再次进入锁会使state加1,退出一次state减1)

  2. CountDownLatch,共享锁。可用于控制线程执行、结束的时机。如我们想要主线程在2个子线程执行完后再结束,这时使用CountDownLatch通过构造函数将共享变量state设为2,将主线程锁住,每个子线程结束后state减一,state为0后表示两子线程执行完毕,此时主线程才得以释放。

也即是说,通过AQS,我们将能很简单的实现同步的要求。这也是模板方法模式的运用。

AQS主要模板方法如下

方法 描述
acquire / acquireInterruptibly 独占式获取同步状态,若获取失败,将进入同步队列。后者与前者的区别在于,后者能在同步队列中响应中断
acquireShared / acquireSharedInterruptibly 共享式获取同步状态,后者能响应中断
release 独占式释放同步状态,成功后将同步队列的第一个线程唤醒
releaseShared 共享式释放同步状态

需要子类实现的方法如下

方法 实现思路
tryAcquire 独占式获取同步状态,实现该方法需要查询当前状态,并判断状态是否符合预期(根据各子类不同功能判断条件各异),然后再根据CAS设置同步状态
tryRelease 独占式释放同步状态
tryAcquireShared 共享式获取同步状态,若返回值大于等于0,表示获取成功,否则表示失败
tryReleaseShared 共享式释放同步状态
isHeldExclusively 在独占模式下,同步状态是否被占用

AQS会在合适的时候调用子类的方法,以实现不同功能。以acquire 方法为例

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&   //会调用子类的tryAcquire方法,实现不同的acquire含义
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
  }

另外,AQS还提供了有关获取设置state状态的有关方法,我们在自定义子类中会用到。

方法 描述
getState 获取同步状态
setState(state) 设置同步状态
compareAndSetState(except,update) 使用CAS设置同步状态,只有当同步状态值为except时,才将其设置update

一个简单的锁

根据上面提到的,我们来自制一个独占类型的锁。

根据AQS的建议,实现AQS的类最好为同步器的内部类。下面是自制锁MyLock代码示例


public class MyLock implements Lock {

    private Sync sync = new Sync();

    //AQS的子类,由于是独占锁,实现tryAcquire和tryRelease两方法
    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean tryAcquire(int arg) {
            //若状态为1,说明有其他线程已占有锁,直接返回false
            if(getState()==arg){
                return false;
            }
            //若状态为0,将其设为1,表示占有锁
            return compareAndSetState(0, arg);
        }

        @Override
        protected boolean tryRelease(int arg) {
            //设置状态为0,表示释放锁
            setState(0);
            return true;
        }
    }

    //其他Lock接口方法,直接调用Sync类实现

    @Override
    public void lock() {
        sync.acquire(1);

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);

    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }


    @Override
    public void unlock() {
        sync.release(1);

    }

    @Override
    public Condition newCondition() {
        return null;
    }
}


这样我们就实现了一个简单的锁。不过这个锁相比ReentrantLock来说,没有实现可重入性(也没有实现关联条件Condition)。也就是说它会被自己锁死:当某个线程在获取锁后再次尝试获取锁,会导致死锁。不过,实现类似i++的同步倒是可以做到的。

       public void run() {
            myLock.lock();
            try {
                total++;
            } finally {
                myLock.unlock();
            }
        }

对于可重入锁,需要记住持有锁的线程,当加锁时,判断当前线程是否持有锁,若持有,直接进入同步块,同时将state加1,当试图释放锁时,将state减1。若state减到0,释放锁。其他过程与其他一致。感兴趣你可以试试。

对于AQS的大致理解就到这了,有时间我们再深入源码分析其具体实现。