简单的了解一下AQS吧

1,240 阅读7分钟

什么是AQS

AQS,即AbstractQueuedSynchronizer,是一套定义了多线程访问共享资源的同步器框架。在JDK的并发包中很多类都是基于AQS进行实现的,比如ReentrantLockCountDownLatch等。

AQS中的设计模式

如果单单只是看AQS类中的代码的haul可能会产生很多疑惑,因为类中很多方法都是只有方法体,具体的实现需要到子类中才能看到。

模板方法模式

在我们平常的开发中会经常遇到一个问题,当我们接到一个需求时,在整理大体思路时会很清晰。但是当实际实现的时候会发现问题很多,有些步骤实现是没有办法确定下来的。会根据不同的需求进行更改。

这种逻辑流程确定,但是具体实现可能不同的问题可以通过模板方法模式来解决。

所谓的模板方法模式就是定义一个操作的流程骨架,确定调用流程。但是具体的实现则交给子类去完成。模板方法模式就是利用了面向对象中的多态特性。

在模板方法模式中有两个重要的角色,一个是抽象模板类,另一个就是具体的实现类。

抽象模板类

抽象模板类用于定义业务流程,在该类中定义了一系列完成业务所需的方法。能够确定的方法可以在抽象类中实现逻辑。不能确定的只是定义好方法,具体的实现由子类完成。

以AQS举例,AbstractQueuedSynchronizer被定义为抽象类,其中一部分方法只是定义了方法体:

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

尽管这部分方法并没有提供具体的实现,但是AQS中的其他方法还是直接调用了该方法。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

跟句抽象类的特性,如果要使用这些方法的话就必须在子类继承AQS并实现这些抽象方法。这样的方法类被称为模板类。

实现类

模板类的抽象方法的逻辑实现是在子类中完成的,不同的子类可以根据具体的需求进行个性化的实现。

比如ReentrantLock中Sync和FairSync对于tryAcquire的实现:

Sync:

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

FairSync:

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //这里多了一次是否存在等待更长时间线程的判断
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

这样的类被称为实现类。

AQS中的模板方法

AQS就是典型的应用模板方法模式的例子,如果我们要通过AQS来实现一个同步类。那么我们需要实现以下方法:

tryAcquire(int) 
tryRelease(int) 
tryAcquireShared(int) 
tryReleaseShared(int) 
isHeldExclusively() 

部分参数解析

state

state参数是非常重要的一个参数,AQS的锁状态就是依赖于改参数实现的。

AQS中对锁的操作是利用CAS进行实现,而cas主要操作的对象就是state参数。当state=0时表示可以获取锁,而当state!=0时则表示已经进行了加锁操作。

可重入锁的实现也依赖于该参数,当持有锁的线程再次获取一次锁时便将state的值加一,而每一次释放一次锁则进行减一操作,只有当state=0时才算是释放锁完毕。

Node

static final class Node {

        static final Node SHARED = new Node();

        static final Node EXCLUSIVE = null;

        static final int CANCELLED =  1;

        static final int SIGNAL    = -1;

        static final int CONDITION = -2;

        static final int PROPAGATE = -3;

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;
}

Node用于保存获取锁失败时

Node.SHARED和Node.EXCLUSIVE

在AQS的具体实现中存在两种不同模式的锁:排他锁和共享锁

一般共享锁主要用于读操作,表示读操作可以是多个线程同时进行,而不会阻塞;排他锁主要用于写操作,会进行阻塞

而排他锁和共享锁的实现就依赖于Node.SHARED和Node.EXCLUSIVE区分。比如ReentrantReadWriteLock

waitStatus

waitStatus用于表示当前节点所处的状态。

  • 初始状态:节点初始状态值被初始化为0,如果是通过condition注册的节点其初始状态为-2(CONDITION)
  • CANCELLED:static final int CANCELLED = 1;由于超时或者中断等原因使得当前节点被标记位取消状态。一般来说被标记为取消状态的节点不会再去竞争锁并且不能转换为其他状态。
  • SIGNAL:static final int SIGNAL = -1;当前节点的后继节点通过park被阻塞(或者将要被阻塞)。那么在当前节点释放或者取消时需要通过unpark取消阻塞。
  • CONDITION:static final int CONDITION = -2;将节点放在condition队列中是需要标识其状态为CONDITION。
  • PROPAGATE:static final int PROPAGATE = -3;该状态值用于在共享状态下,当共享状态的锁被释放后,该操作会被传播到其他节点。

prev next

prev和next分别用于记录前驱节点和后继节点

重要方法解析

tryAcquire

protected boolean tryAcquire(int arg);

tryAcquire字面意思很明确,就是尝试获取锁。获取锁成功则返回true,获取锁失败则将该线程放入等待队列中,等待占用资源的线程被释放。

在JDK中明确定义tryAcquire方法用于获取的处于独占模式下的锁。如果不是独占模式则抛出异常UnsupportedOperationException

该方法需要被重写。

该方法共享模式版本为protected int tryAcquireShared(int arg).

tryRelease

protected boolean tryRelease(int arg);

该方法用于在独占模式下通过cas尝试设置state状态值,用于释放锁操作。

修改值成功则返回true。如果不是独占模式则抛出异常UnsupportedOperationException

该方法需要被重写。

该方法的共享模式方法为protected boolean tryReleaseShared(int arg)

isHeldExclusively

该方法用于来判断是否当前线程正在以独占模式进行同步操作。

setState和compareAndSetState

setState和compareAndSetState两个方法都是对state参数的值进行设置。

不同之处在于compareAndSetState主要用于获取锁时修改状态值,因为获取锁时存在竞争问题所以需要原子操作获取。

而setState操作用于在释放锁是修改state的值,释放锁时只有持有锁的线程会进行释放,不存在竞争问题,不需要原子操作。

动手实现一个同步类

现在我们来实现一个我们自己的同步类,一个不可重入的独占锁。

public class MyLock implements Lock {

    static class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            //这里只有当state=0时才能获取锁 表示该同步类不可重入
            if(compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if(getState()!=1){
                //无法再被释放
                throw new IllegalMonitorStateException();
            }
            setState(0);
            setExclusiveOwnerThread(null);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState()==1 || 
                    getExclusiveOwnerThread()==Thread.currentThread();
        }

        // 返回一个Condition,每个condition都包含了一个condition队列
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();
    
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(0);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

即使我们并不知道AQS的内部实现,只需要了解AQS中的几个方法作用并在子类中重写这些方法就能设计出一个简单的同步类。