1 简介
Condition是用于当线程的等待/唤醒操作,实现是AbstractQueuedSynchronizer.ConditionObject
,示例代码:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
condition.await();
// operations
condition.signal();
}finally {
lock.unlock();
}
注意:
- Condition对象通过
Lock.newCondition()
创建; - 必须在Lock.lock()后才能进行
Condition.await
操作,原因文章后面会讲。
Condition基于AQS实现,AQS可以参考AQS源码分析及核心方法解析。
2 属性
private transient Node firstWaiter; // 等待队列的第一个节点
private transient Node lastWaiter; // 等待队列的最后一个节点
Condition的Node,只有CONDITION/CANCELLED两种状态。
3 内部方法
3.1 addConditionWaiter
功能:使用 当前线程 和 waitStatus值Node.CONDITION
构造新节点,并添加到条件队列的队尾。
返回:添加的新节点
流程:
- 1 如果
lastWaiter==null
,说明队列未初始化,则firstWaiter = node
; - 2 如果原来的
lastWaiter!=null
,则原来的lastWaiter.nextWaiter = node
; - 3 最后,
lastWaiter = node
,将lastWaiter字段指向node。
private Node addConditionWaiter() {
Node t = lastWaiter;
/*
* 如果lastWaiter不是CONDITION状态,则清除掉条件队列中所有CANCELLED状态的节点。
* 清理后lastWaiter会变。
*/
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新建节点,节点初始waitStatus是CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
/*
t指向了当前的lastWaiter
如果 t == null 的话,则firstWaiter指向新节点;
否则把 t.nextWaiter指向新节点,也就是把新节点加都队尾。
*/
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
// 最后把属性lastWaiter指向新节点node
lastWaiter = node;
return node;
}
3.2 unlinkCancelledWaiters
功能:清除掉条件队列中所有被取消的节点;
流程:
- 1 从firstWaiter开始,将整个链表中
t.waitStatus != Node.CONDITION
的节点移除掉; - 2 节点移除后,将其前置节点的
nextWaiter
指向后置节点。
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
/*
*当前节点状态不是CONDITION:
*当前节点的nextWaiter设置为null。
* 如果trail是空,则将firstWaiter指向之前保存的t.nextWaiter,
* 否则将trail.nextWaiter指向之前保存的t.nextWaiter。
*/
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
3.3 fullyRelease
功能:用当前的当前state作为参数,调用release
(目的是释放锁)。
返回:释放之前的state的值。
流程:
- 1 通过
getState()
获取当前state; - 2 执行
release(int)
; - 3 成功则返回释放之前的state值,失败则
node.waitStatus = Node.CANCELLED
。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
3.4 isOnSyncQueue
功能:判断节点是否在同步队列中。
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
// waitStatus是CONDITION,或prev是null
return false;
if (node.next != null)
// 根据文章前面对Node的描述,next只会在同步队列中有值。
return true;
/*
因为CAS操作替换值的时候可能会失败,所以有可能出现:
node.prev不为null,但是没在同步队列中。
所以需要从队尾向前再遍历一遍。
*/
return findNodeFromTail(node);
}
/**
从同步队列的tail开始向前遍历,查找是否有节点node。
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
3.5 transferForSignal
功能:将一个Condition队列节点,转成同步队列节点,然后加入到同步队列中,最后将该节点在同步队列中的前置的waitStatus
CAS为Node.SIGNAL
。
返回:参数node是取消状态(waitStatus != Node.CONDITION)返回false,否则返回true。
流程:
- 1 先把节点的waitStatus从
Node.CONDITION
设置为0,设置失败则返回false; - 2 调用
enq
方法将该节点加入同步队列的队尾,并返回该节点在同步队列中的前置节点; - 3 如果前置节点的waitStatus是CANCELLED状态,则唤醒节点node所持线程;
- 4 否则,对前置节点的
waitStatus
CAS为Node.SIGNAL
,失败则唤醒节点node所持线程。
final boolean transferForSignal(Node node) {
// 将node.waitStatus通过CAS由Node.CONDITION置换为0,失败则返回false。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
node加入同步队列;
返回值p是同步队列的原队尾,即现在node在同步队列中的前置节点。
*/
Node p = enq(node);
int ws = p.waitStatus;
/*
ws是node在同步队列中的前置节点的waitStatus值;
ws > 0表示p是CANCELLED状态;
ws <= 0时执行CAS,将p的状态置换为Node.SIGNAL。
如果p是CANCELLED状态,或 CAS为Node.SIGNAL失败,则unpark当前节点的线程。
*/
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
3.6 doSignal
功能:通过调用transferForSignal
:将参数first
转换成同步队列节点(waitStatus==0
),加入到同步队列中,同时将其在同步队列中的前置节点waitStatus置为Node.SIGNAL
。
流程:
- 1 将参数first的nextWaiter赋给firstWaiter备用;
- 2 调用
transferForSignal(first)
; - 3 如果 transferForSignal成功 则结束;否则将之前firstWaiter(指向之前的first.nextWaiter)赋给first重新循环。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
4 await
4.1 执行过程
- 1 执行
addConditionWaiter()
创建新的CONDITION状态节点,加入条件队列队尾,并返回该节点; - 2 执行
fullyRelease
释放掉当前state的值(即置为0),并将释放前的state值保存到savedState;失败的话,新节点waitStatus置为CANCELLED; - 3 通过
isOnSyncQueue
,判断是否在同步队列中,不在则执行park
; - 4 被其他线程unpark后,用之前保存savedState的值进行
acquireQueued(node, savedState)
。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加新节点到队尾
Node node = addConditionWaiter();
// 释放state
int savedState = fullyRelease(node);
int interruptMode = 0;
// 节点不在同步队列中
while (!isOnSyncQueue(node)) {
//park
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
4.2 Lock.lock后才能await?
Condition.await
调用的fullyRelease
中,会调用release
,进而会调用tryRelease
,ReentrantLock中tryRelease
会进行如下判断:
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
而在执行Lock.lock()之前,getExclusiveOwnerThread()
返回null,所以:
执行
Lock.lock
之后才能执行Condition.await
。
5 signal
- 判断当前线程是否独占了锁,是则执行
doSignal
; isHeldExclusively
在ReentrantLock
中的实现是return getExclusiveOwnerThread() == Thread.currentThread();
。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}