让人抓头的Java并发(三) 强大的AQS!

2,904 阅读8分钟

前两篇文章介绍了多线程和线程池的一些概念,在这一篇终于要介绍JUC中非常重要的一个基础组件AQS了!

AQS简介

AQS是队列同步器AbstractQueuedSynchronizer的简称,是用来构建锁和其他队列同步组件(ReentrantLock、CountDownLatch、Semaphore等)的基础框架。它使用一个volatile修饰的int类型成员变量表示同步状态,使用一个静态内部类Node构成的队列(双向链表)实现获取同步资源线程的排队工作。AQS中使用了模板方法模式,子类通过继承AQS并重写它的部分方法来管理同步状态,一般都将子类作为自定义同步组件的静态内部类。

AQS的实现分析

同步状态

/**
 * 同步状态
 */
private volatile int state;
/**
 * 返回同步状态的当前值
 */
protected final int getState() {
    return state;
}
/**
 * 设置同步状态的值
 */
protected final void setState(int newState) {
    state = newState;
}
/**
 * 利用CAS操作更新当前的status值
 */
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

status字段由volatile修饰保证了其可见性,利用CAS操作保证了修改status操作的原子性。因此在AQS中它是线程安全的(并不意味这volatile修饰的变量是线程安全的,只对单个变量读/写具有原子性,所以这里更新操作需要由CAS保证原子性)。status的值可以被用来控制这个同步器是属于独占模式(小于等于1)还是共享模式(大于1)。

独占模式:同一时刻最多只有一个线程获取到同步状态;
共享模式:同一时刻会有多个线程获取到同步状态;

AQS中提供了一些模板方法控制同步状态,同步组件需要使用AQS提供的模板方法来实现同步,重写的关键方法有:

方法 说明
tryAcquire(int arg) 独占式获取同步状态
tryAcquireShared(int arg) 共享式获取同步状态
tryRelease(int arg) 独占式释放同步状态
tryReleaseShared(int arg) 共享式释放同步状态

同步队列

AQS依赖同步队列来完成同步状态的管理,当前线程获取同步状态失败时会被构造成一个Node节点并被加入同步队列中(通过CAS保证线程安全),同时会阻塞当前线程。当同步状态释放时,会把队列首节点中的线程唤醒重新尝试获取同步状态;

private transient volatile Node head;
private transient volatile Node tail;
/** CAS设置头节点.只被enq()方法调用.后续很多节点入队出队操作都是使用enq()方法*/
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/** CAS设置尾节点.只被enq()方法调用.*/
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
    
static final class Node {
    /** 标记表示节点正在共享模式中等待 */
    static final Node SHARED = new Node();
    /** 标记表示节点正在独占模式中等待 */
    static final Node EXCLUSIVE = null;
    /**
    * 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,
    * 节点进入该状态将不会变化
    */
    static final int CANCELLED =  1;
    /** 
    * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消
    * 会通知后继节点,使后继节点的线程得以运行
    */
    static final int SIGNAL    = -1;
    /** 
    * 节点在等待队列中,当其他线程对condition调用了signal()后,
    * 该节点会从等待队列转移到同步队列中
    */
    static final int CONDITION = -2;
    /** 表示下一次共享式同步状态获取将会被传播 */
    static final int PROPAGATE = -3;
    /** 等待状态*/
    volatile int waitStatus;
    /** 前驱节点*/
    volatile Node prev;
    /** 后继节点*/
    volatile Node next;
    /** 获取同步状态的线程*/
    volatile Thread thread;
    /** 等待队列Condition中的后继节点,如果式共享模式则是SHARED常量*/
    Node nextWaiter;
    
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
}

同步状态的获取流程(独占模式)

关于获取同步状态的源码部分这里简单分析下。一图胜千言,给大家一个流程图让大家了解下这个大致的过程。

源码分析

acquire(int arg)方法用来获取同步状态

/**
 * 以独占模式获取同步状态,忽略中断.通过至少调用tryAcquire实现,成功返回.
 * 否则线程排队,可能重复阻塞和解除阻塞,调用tryAcquire直到成功。
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

其中tryAcquire()方法对于不同的同步组件有不同的实现方式,例如ReentrantLock中的 公平锁和非公平锁的实现方式就略有不同。本篇不详细分析,主要功能是获取同步状态,成功返回true,否则返回false。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 尝试enq方法的快速路径.失败后备份到完整enq方法
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 如果队列为空,初始化
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter(Node.EXCLUSIVE)方法用于构造一个独占模式的Node节点并尝试使用CAS操作将其加入到等待队列作为尾节点;如果加入失败则进入enq()方法通过在死循环中CAS来设置为尾节点直到成功。

/**
* 对于已经在同步队列中的线程,以独占不间断模式获取。由条件等待方法使用以及获取。
*/
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点是头节点则尝试获取同步状态
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                failed = false;
                return interrupted;
            }
            // 如果前驱节点不是头节点则阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在acquireQueued()方法中,当前线程通过死循环尝试获取同步状态,而且只有当前驱节点是头节点的时候才能尝试获取同步状态。头节点的线程释放同步状态之后会唤醒其后继节点,这里在获取同步状态之前检查前驱是否为头节点是为了防止过早的通知(等待线程由于中断被唤醒)

同步状态的释放流程(独占模式)

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 如果队列中头节点不为null并且该节点不是处于初始状态(waitStatus为0)
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    /*
     * 如果状态为负,请尝试以预期信号清
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 如果后继节点为null或者waitStatus>0(为1,从同步队列中取消了)
     * 则继续向后寻找节点
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

该方法通过调用tryRelease(arg)释放同步状态,具体根据不同组件实现,之后会唤醒它的后继节点使后继节点尝试获取同步状态。

Condition简介

Condition接口定义了等待/通知两种类型的方法,调用这些方法的前提是需要获取到Condition对象关联的锁(类似于调用Object的wait、notify需要先获取Synchronized锁)。ConditionObject是AQS中的一个内部类,实现了Condition接口,有两个Node类型的成员变量firstWaiter、lastWaiter,实际上就是个双向队列。

等待

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 加入等待队列 
    Node node = addConditionWaiter();
    // 释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 进入等待状态
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被唤醒,重新尝试进入同步队列获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 如果被中断则抛出异常
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

调用await()方法会使当前线程加入等待队列并释放同步状态,唤醒同步队列中的后继节点,然后当前线程进入等待状态。当处于等待队列的该节点被唤醒(signal())后重新尝试获取同步状态,如果是由于中断被唤醒则抛出异常。

通知

public final void signal() {
    // 判断当前线程是否获取了锁 
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    // 唤醒等待队列中的第一个节点
    if (first != null)
        doSignal(first);
}

调用该方法必须要先获取同步状态,该方法会将等待队列中的第一个节点移动到同步队列然后唤醒它,被唤醒的节点会重新尝试获取同步状态

总结

上面我简单分析了JUC中的重要工具AQS,其实对于AQS的同步状态的获取和释放就相当于进入和退出Synchronized同步。AQS的等待队列相当于Synchronized的锁等待池。AQS中的Condition相当于Synchronized的对象等待池。Condition的await()和signal()相当于Object的wait()和notify()。







参考:《Java并发编程的艺术》