Java AQS 之 Condition

1,097 阅读8分钟

本文前置知识为 AQS,如不了解 AQS 可先阅读Java AQS 详解

示例

Condition 可以用在经典的生产者-消费者场景中,下面是 java.util.concurrent 的作者 Doug Lea 给出的例子

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    // condition 依赖于 lock 来产生
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生产
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();  // 队列已满,等待,直到 not full 才能继续生产
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
        } finally {
            lock.unlock();
        }
    }

    // 消费
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
            return x;
        } finally {
            lock.unlock();
        }
    }
}

我们可以看到,在使用 Condition 时,必须先持有相应的锁。这个和 Object 类中的方法有相似的语义,需要先持有某个对象的监视器锁才可以执行 wait()notify()notifyAll() 方法

ConditionObject

翻看 ReentrantLocknewCondition() 源码会发现, 每调用一次 newCondition() 就返回了一个 ConditionObject 实例, 而 ConditionObject 则是 AbstractQueuedSynchronizer(AQS)的内部类

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;

    public Condition newCondition() {
        return sync.newCondition();
    }
    
    final ConditionObject newCondition() {
        return new ConditionObject();
    }
}

接下来看一下 ConditionObject 的源码,会发现多了一个条件队列的概念,这里附一张简图,搭配源码食用更佳

await

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    static final class Node {
        // 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)
        volatile int waitStatus; 
        // prev 和 next 用于实现 CLH 队列双向链表
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        // nextWaiter 用于实现条件队列单向链表
        Node nextWaiter;
    }
    
    public class ConditionObject implements Condition, java.io.Serializable {
        /**
         * 在这里又引入了一个 condition queue 的概念,也就是条件队列
         * 1.条件队列和阻塞队列的节点,都是 Node 的实例,因为条件队列的节点是需要转移到阻塞队列中去的
         * 2.ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例,这里对应 condition1 和 condition2
         * 注意:ConditionObject 只有两个属性 firstWaiter 和 lastWaiter
         * 3.每个 Condition 有一个关联的条件队列,如线程 1 调用 condition1.await() 方法即可将当前线程 1 包装成 Node 后加入到条件队列中,
         * 然后阻塞在这里,不继续往下执行,条件队列是一个单向链表
         * 4.调用 condition1.signal() 触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的 firstWaiter(队头) 移到阻塞队列的队尾,
         * 等待获取锁,获取锁后 await 方法才能返回,继续往下执行
         * 注意:为便于理解,2->3->4 描述了一个最简单的流程,没有考虑中断、signalAll、还有带有超时参数的 await 方法
         */
        
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
        
        /**
         * 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly()
         * 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断
         */
        public final void await() throws InterruptedException {
            // 判断中断状态
            if (Thread.interrupted())
                throw new InterruptedException();
            // 添加到 condition 条件队列中
            Node node = addConditionWaiter();
            // 释放锁,返回值是释放锁之前的 state 值
            // await() 之前,当前线程是必须持有锁的,这里肯定要释放掉
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 如果不在阻塞队列中,注意了,是阻塞队列
            // 这里退出循环有两种情况,之后再仔细分析
            // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了
            // 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断
            while (!isOnSyncQueue(node)) {
                // 线程挂起
                LockSupport.park(this);
                /* 
                 * interruptMode: REINTERRUPT(1),THROW_IE(-1),0
                 * REINTERRUPT 代表 await 返回的时候,需要重新设置中断状态
                 * THROW_IE 代表 await 返回的时候抛出 InterruptedException
                 * 0 说明在 await 期间,没有发生中断
                 *
                 * 这里说明一下为什么需要重新中断
                 * private int checkInterruptWhileWaiting(Node node) {
                 *   return Thread.interrupted() ?
                 *      (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 
                 *      0;
                 * }
                 * 调用 Thread.interrupted() 方法,如果当前线程已经处于中断状态,那么该方法返回 true,同时会将中断状态重置为 false
                 * 所以才需要重新中断
                 * 线程中断会调用这个方法,如果需要的话,将这个已经取消等待的节点转移到阻塞队列
                 * final boolean transferAfterCancelledWait(Node node) {
                 *     // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断
                 *     // 因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0
                 *     if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                 *         // 即使中断了也会放入阻塞队列
                 *         enq(node);
                 *         return true;
                 *     }
                 *     // 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
                 *     // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
                 *     while (!isOnSyncQueue(node))
                 *         Thread.yield();
                 *     return false;
                 * }
                 */
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 被唤醒后,将进入阻塞队列,等待获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
        
        /** 将当前线程对应的节点入队,插入队尾 */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // 如果条件队列的最后一个节点取消了,将其清除出去
            // Node.CONDITION indicate thread is waiting on condition
            if (t != null && t.waitStatus != Node.CONDITION) {
                // 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 将当前线程包装为 Node,并指定 waitStatus=Node.CONDITION
            // 从上面代码可以看出如果 waitStatus!=Node.CONDITION 会被移出条件队列
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
        
        /** 条件队列是一个单向链表,遍历链表将已经取消等待的节点清除出去 */
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                // 如果 Node waitStatus!=Node.CONDITION,就移出条件队列
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }
        
        /** 完全释放独占锁,不是一次一次释放(重入) */
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    // 如果失败抛出异常
                    throw new IllegalMonitorStateException();
                }
            } finally {
                // 如果释放失败,还会把线程所在 Node waitStatus 设为取消状态
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
        
        /** 判断节点是否已经移入 CLH 阻塞队列 */
        final boolean isOnSyncQueue(Node node) {
            // 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到
            // 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中
            // 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列(prev是阻塞队列链表中使用的)
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            // 如果 node 已经有后继节点 next 的时候,那肯定是在阻塞队列了
            if (node.next != null) // If has successor, it must be on queue
                return true;
            // 从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,否则就是不在阻塞队列
            // 这里不能通过 node.prev != null 来判断 node 是否在阻塞队列
            // 因为 AQS 的入队方法,首先将 node.prev 指向 tail
            // 然后是 CAS 操作将自己设置为新的 tail,但是这次 CAS 是可能失败的
            return findNodeFromTail(node);
        }
        
        private boolean findNodeFromTail(Node node) {
            Node t = tail;
            for (;;) {
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }
        
    }
}

signal

public final void signal() {
    // 调用 signal 方法的线程必须持有当前的独占锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 唤醒等待最久的线程
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

/** 
 * 从条件队列队头往后遍历,找出第一个需要转移的 Node 
 * 有些线程可能会取消排队,但还在条件队列中
 */
private void doSignal(Node first) {
    do {
        // 将 firstWaiter 指向 first 节点后面的第一个,因为 first 节点马上要离开了
        // 如果将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
        first.nextWaiter = null;
    // transferForSignal 将节点从条件队列转移到同步队列
    // 如果失败,会执行 first = firstWaiter,即条件队列中 first 下一个节点
    // 依次类推,就是从条件队列队头开始往队尾遍历,直到找到符合条件的 Node
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

/**
 * 将节点从条件队列转移到阻塞队列
 * true 代表成功转移
 * false 代表在 signal 之前,节点已经取消了
 */
final boolean transferForSignal(Node node) {
    // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,
    // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点
    // 否则,将 waitStatus 置为 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // enq(node): 自旋进入阻塞队列的队尾
    // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    /*
     * ws > 0 (即 = 1)说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。唤醒之后会怎么样,后面再解释
     * 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用
     * 节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1)
     * 如果有疑问建议再看一下 AQS 源码
     * 正常情况下,ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这句中
     * ws <= 0,而且 compareAndSetWaitStatus(p, ws, Node.SIGNAL) 会返回 true
     * 所以一般也不会进去 if 语句块中唤醒 node 对应的线程
     */
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 如果前驱节点 waitStatus CAS 设置失败,就唤醒当前线程
        LockSupport.unpark(node.thread);
    return true;
}


参考:

~~~~~~~~一行一行源码分析清楚 AbstractQueuedSynchronizer (二)