本文前置知识为 AQS,如不了解 AQS 可先阅读Java AQS 详解
示例
Condition 可以用在经典的生产者-消费者场景中,下面是 java.util.concurrent
的作者 Doug Lea 给出的例子
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// condition 依赖于 lock 来产生
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
// 生产
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); // 队列已满,等待,直到 not full 才能继续生产
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
} finally {
lock.unlock();
}
}
// 消费
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
return x;
} finally {
lock.unlock();
}
}
}
我们可以看到,在使用 Condition 时,必须先持有相应的锁。这个和 Object
类中的方法有相似的语义,需要先持有某个对象的监视器锁才可以执行 wait()
, notify()
或 notifyAll()
方法
ConditionObject
翻看 ReentrantLock
的 newCondition()
源码会发现,
每调用一次 newCondition()
就返回了一个 ConditionObject
实例,
而 ConditionObject
则是 AbstractQueuedSynchronizer
(AQS)的内部类
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
}
接下来看一下 ConditionObject
的源码,会发现多了一个条件队列的概念,这里附一张简图,搭配源码食用更佳
await
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
static final class Node {
// 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)
volatile int waitStatus;
// prev 和 next 用于实现 CLH 队列双向链表
volatile Node prev;
volatile Node next;
volatile Thread thread;
// nextWaiter 用于实现条件队列单向链表
Node nextWaiter;
}
public class ConditionObject implements Condition, java.io.Serializable {
/**
* 在这里又引入了一个 condition queue 的概念,也就是条件队列
* 1.条件队列和阻塞队列的节点,都是 Node 的实例,因为条件队列的节点是需要转移到阻塞队列中去的
* 2.ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例,这里对应 condition1 和 condition2
* 注意:ConditionObject 只有两个属性 firstWaiter 和 lastWaiter
* 3.每个 Condition 有一个关联的条件队列,如线程 1 调用 condition1.await() 方法即可将当前线程 1 包装成 Node 后加入到条件队列中,
* 然后阻塞在这里,不继续往下执行,条件队列是一个单向链表
* 4.调用 condition1.signal() 触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的 firstWaiter(队头) 移到阻塞队列的队尾,
* 等待获取锁,获取锁后 await 方法才能返回,继续往下执行
* 注意:为便于理解,2->3->4 描述了一个最简单的流程,没有考虑中断、signalAll、还有带有超时参数的 await 方法
*/
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly()
* 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断
*/
public final void await() throws InterruptedException {
// 判断中断状态
if (Thread.interrupted())
throw new InterruptedException();
// 添加到 condition 条件队列中
Node node = addConditionWaiter();
// 释放锁,返回值是释放锁之前的 state 值
// await() 之前,当前线程是必须持有锁的,这里肯定要释放掉
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果不在阻塞队列中,注意了,是阻塞队列
// 这里退出循环有两种情况,之后再仔细分析
// 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了
// 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断
while (!isOnSyncQueue(node)) {
// 线程挂起
LockSupport.park(this);
/*
* interruptMode: REINTERRUPT(1),THROW_IE(-1),0
* REINTERRUPT 代表 await 返回的时候,需要重新设置中断状态
* THROW_IE 代表 await 返回的时候抛出 InterruptedException
* 0 说明在 await 期间,没有发生中断
*
* 这里说明一下为什么需要重新中断
* private int checkInterruptWhileWaiting(Node node) {
* return Thread.interrupted() ?
* (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
* 0;
* }
* 调用 Thread.interrupted() 方法,如果当前线程已经处于中断状态,那么该方法返回 true,同时会将中断状态重置为 false
* 所以才需要重新中断
* 线程中断会调用这个方法,如果需要的话,将这个已经取消等待的节点转移到阻塞队列
* final boolean transferAfterCancelledWait(Node node) {
* // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断
* // 因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0
* if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
* // 即使中断了也会放入阻塞队列
* enq(node);
* return true;
* }
* // 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
* // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
* while (!isOnSyncQueue(node))
* Thread.yield();
* return false;
* }
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被唤醒后,将进入阻塞队列,等待获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/** 将当前线程对应的节点入队,插入队尾 */
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果条件队列的最后一个节点取消了,将其清除出去
// Node.CONDITION indicate thread is waiting on condition
if (t != null && t.waitStatus != Node.CONDITION) {
// 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
unlinkCancelledWaiters();
t = lastWaiter;
}
// 将当前线程包装为 Node,并指定 waitStatus=Node.CONDITION
// 从上面代码可以看出如果 waitStatus!=Node.CONDITION 会被移出条件队列
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/** 条件队列是一个单向链表,遍历链表将已经取消等待的节点清除出去 */
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 如果 Node waitStatus!=Node.CONDITION,就移出条件队列
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
/** 完全释放独占锁,不是一次一次释放(重入) */
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
// 如果失败抛出异常
throw new IllegalMonitorStateException();
}
} finally {
// 如果释放失败,还会把线程所在 Node waitStatus 设为取消状态
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
/** 判断节点是否已经移入 CLH 阻塞队列 */
final boolean isOnSyncQueue(Node node) {
// 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到
// 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中
// 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列(prev是阻塞队列链表中使用的)
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果 node 已经有后继节点 next 的时候,那肯定是在阻塞队列了
if (node.next != null) // If has successor, it must be on queue
return true;
// 从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,否则就是不在阻塞队列
// 这里不能通过 node.prev != null 来判断 node 是否在阻塞队列
// 因为 AQS 的入队方法,首先将 node.prev 指向 tail
// 然后是 CAS 操作将自己设置为新的 tail,但是这次 CAS 是可能失败的
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
}
}
signal
public final void signal() {
// 调用 signal 方法的线程必须持有当前的独占锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 唤醒等待最久的线程
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* 从条件队列队头往后遍历,找出第一个需要转移的 Node
* 有些线程可能会取消排队,但还在条件队列中
*/
private void doSignal(Node first) {
do {
// 将 firstWaiter 指向 first 节点后面的第一个,因为 first 节点马上要离开了
// 如果将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
first.nextWaiter = null;
// transferForSignal 将节点从条件队列转移到同步队列
// 如果失败,会执行 first = firstWaiter,即条件队列中 first 下一个节点
// 依次类推,就是从条件队列队头开始往队尾遍历,直到找到符合条件的 Node
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* 将节点从条件队列转移到阻塞队列
* true 代表成功转移
* false 代表在 signal 之前,节点已经取消了
*/
final boolean transferForSignal(Node node) {
// CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,
// 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点
// 否则,将 waitStatus 置为 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// enq(node): 自旋进入阻塞队列的队尾
// 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
/*
* ws > 0 (即 = 1)说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。唤醒之后会怎么样,后面再解释
* 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用
* 节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1)
* 如果有疑问建议再看一下 AQS 源码
* 正常情况下,ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这句中
* ws <= 0,而且 compareAndSetWaitStatus(p, ws, Node.SIGNAL) 会返回 true
* 所以一般也不会进去 if 语句块中唤醒 node 对应的线程
*/
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 如果前驱节点 waitStatus CAS 设置失败,就唤醒当前线程
LockSupport.unpark(node.thread);
return true;
}
参考: