1 概述
AQS提供了一个基于FIFO队列实现锁和同步器的基本框架。
j.c.u中使用AQS的类有:
- CountDownLatch.Sync
- ThreadPoolExecutor.Worker
- ReentrantLock.FairSync/NonfairSync
- ReentrantReadWriteLock.FairSync/NonfairSync
- Semaphore.Sync
官方文档:docs.oracle.com/javase/8/do…
2 AQS如何使用
2.1 相关概念
state:同步状态值,用于子类中实现状态记录;
acquire:修改state,可以理解为获取锁;
release:与acquire相反操作的修改state,可以理解为释放锁。
predecessor:前驱,successor:后继
Node.EXCLUSIVE:独占模式, Node.SHARED:共享模式
2.2 实现约定
使用AQS实现一个锁或同步器,需要依据以下几点去实现:
-
1 同步器需要以一个单独的数字表示状态。
-
2 同步器需要定义一个继承AQS的内部类去实现同步属性。
-
3 内部类继承AQS后,必须根据需要实现
tryAcquire*
/tryRelease*
去改变state。 -
4 有exclusive mode(默认)和shared两种模式:
- exclusive mode:独占模式,其他线程尝试acquire不会成功;
- shared mode:共享模式,多个线程尝试acquire都会成功。
-
5 一般只支持其中一种模式即可,但有特例:ReadWriteLock同时支持两种模式。
-
6 AbstractQueuedSynchronizer.ConditionObject 可以在子类中作为Condition的实现使用。
2.3 实现步骤
2.3.1 独占锁
基本步骤如下:
- 1 创建继承AQS的内部类;
- 2 实现
tryAcquire
/tryRelease
; - 3 lock时调用
acquire
,unlock时调用release
; - 4 判断当前是否有任意线程获取锁时判断
getState() != 0
。
以java.util.concurrent.locks.ReentrantLock
实现公平锁(FairSync)为例:
1 创建继承AQS的内部类
abstract static class Sync extends AbstractQueuedSynchronizer{....}
static final class FairSync extends Sync{....}
2 FairSync实现
tryAcquire
/tryRelease
tryAcquire:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
tryRelease:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
3 lock时调用
acquire
,unlock时调用release
// in ReentrantLock.FairSync
final void lock() {
acquire(1);
}
// in ReentrantLock
public void unlock() {
sync.release(1);
}
4 判断当前是否有任意线程获取锁时判断
getState() != 0
// in ReentrantLock.Sync
final boolean isLocked() {
return getState() != 0;
}
2.3.2 共享锁
基本步骤如下:
- 1 创建继承AQS的内部类;
- 2 实现
tryAcquireShared
/tryReleaseShared
; - 3 加锁时调用
acquireShared
,释放锁时调用releaseShared
。
以java.util.concurrent.CountDownLatch
为例:
1 创建继承AQS的内部类
private static final class Sync extends AbstractQueuedSynchronizer{....}
2 实现
tryAcquireShared
/tryReleaseShared
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
3 加锁时调用
acquireShared
,释放锁时调用releaseShared
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
3 主要属性
FIFO队列,由Node节点组成的链表:
+------+ prev +-----+ +-----+ head | | <---- | | <---- | | tail +------+ +-----+ +-----+
3.1 head
private transient volatile Node head;
功能:队列的头节点。
说明:
- 延迟初始化(在
enq
中初始化)。 - 除了初始化的时候,其他时候只有调用
setHead(Node)
时才会修改。 - head.waitStatus不会是CANCELLED状态。
相关方法:
setHead(Node node)
setHeadAndPropagate(Node node, int propagate)
:调用了setHead
,setHeadAndPropagate
在doAcquireShared*
方法中调用;compareAndSetHead(Node update)
仅在enq
方法中被调用过。
3.2 tail
private transient volatile Node tail;
功能:队列的尾节点。
说明:
- 延迟初始化(在
enq
中使用tail = head
初始化)。 - 仅当调用方法
enq(Node)
添加新的等待节点时才会修改。
相关方法:
enq(final Node node)
:将节点加入到队尾findNodeFromTail(Node node)
: 从tail开始查找节点compareAndSetTail(Node expect, Node update)
3.3 state
private volatile int state;
功能:用来标识同步状态的状态值。
相关方法:
getState()
setState(int newState)
compareAndSetState(int expect, int update)
4 Node
4.1 属性
4.1.1 waitStatus
volatile int waitStatus
:状态
- CANCELLED = 1 : 表示当前节点被取消。
当前节点被取消,比如线程park后timeout或interrupt,状态会变更为CANCELLED。
当一个节点状态变为CANCELLED后,不会再变成其他状态,其线程也不会再次被阻塞。
- SIGNAL = -1 : 表示当前节点的 后续节点的线程 需要被唤醒运行(unpark)。
说明当前节点的 后续节点 是block状态(或即将是block状态),即其线程被park了。
所以当前节点在release或cancel时,必须unpark它的后续节点。
为了避免竞争,acquire方法必须先表明需要一个信号,然后再尝试acquire,失败则阻塞。
- CONDITION = -2 :
表示当前节点的线程在condition队列中(ConditionObject类中设置),该节点直到其状态被设置为0时,才会转变成为同步队列中的节点。
这个状态只在
ConditionObject
中使用。
- PROPAGATE = -3 : 当前场景下后续的acquireShared将会无条件被传播。
releaseShared(共享式的释放同步状态)需要被传播给其他节点;该状态在 doReleaseShared方法中被设置(仅仅适用于头节点)以保证传播,即使其它操作介入也不会中断。
- 0 : 当前节点在同步队列中,等待acquire。
waitStatus默认值:
- 同步队列:0;
- condition队列:是CONDITION(-2)。
4.1.2 prev
volatile Node prev
:前驱节点,只对非ConditionObject的节点定义启用。
相关方法:
Node predecessor()
:返回当前节点的前置节点;boolean hasQueuedPredecessors()
:查询同步队列中,是否有比自己更优先的线程在等待。
4.1.3 next
volatile Node next
:后继节点,只对非ConditionObject的节点定义启用。
相关方法:
-
void unparkSuccessor(Node node)
:unpark节点node的后继节点。
4.1.4 thread
volatile Thread thread
:当前节点的线程,Initialized on construction and nulled out after use.
4.1.5 nextWaiter
Node nextWaiter
:condition队列中的后继节点,只对ConditionObject内的节点定义启用,即当waitStatus==CONDITION时,才有值。
4.2 同步队列&Condition队列中Node的区别
比较 | 同步队列 | Condition队列 |
---|---|---|
使用的字段 | waitStatus/thread/prev/next/nextWaiter | waitStatus/thread/nextWaiter |
nextWaiter含义 | 表示模式,固定为Node.EXCLUSIVE 或 Node.SHARED | 下一个节点 |
waitStatus初始值 | 0 | CONDITION |
waitStatus可选值 | CANCELLED/SIGNAL/PROPAGATE/0 | CANCELLED/CONDITION |
5 AQS子类的实现约定:
- 根据需要,实现
tryAcquire*/tryRelease*
方法; - 使用
getState/setState(int)/compareAndSetState(int,int)
去监视或修改同步状态。
5.1 tryAcquire(int)
独占模式下尝试acquire。
方法实现时,需要查询当前状态是否允许acquire,然后再使用compareAndSetState进行acquire。
tryAcquire
在acquire(int arg)
中被调用。
如果调用失败,并且调用线程没在队列中,acquire方法会将该线程放入队列,直到被其他线程执行release释放。通常用来实现Lock.tryLock()。
返回值:true表示执行成功(state更新成功,成功acquire)。
5.2 tryRelease(int)
独占模式下,尝试修改state进行release。
返回值:true表示当前对象是一个完全release的状态,其他任何等待线程都可以尝试acquire。
5.3 tryAcquireShared(int)
共享模式下尝试acquire。
返回值:
- 负数:获取失败;
- 0 : 获取成功,但接下来共享模式下的请求不会成功,即没有剩余资源可用;
- 正数:获取成功,接下来共享模式下的acquire仍有可能成功, 即仍有剩余资源可用。 剩下的的等待线程必须检测对象的状态是否允许acquire。
5.4 tryReleaseShared(int)
共享模式下,尝试修改state进行release。
返回值:true表示share mode下release成功,即允许一个等待的acquire(shared or exclusive)成功进行。
5.5 isHeldExclusively()
当前线程是否以独占方式进行了同步(即状态被占用),也就是说,当前线程是否在独占资源。
这个方法在ConditionObject的方法中被调用,所以只有用到condition才需要去实现。
以上5个方法,默认的实现都是抛出 UnsupportedOperationException.
6 内部方法
内部方法有的用private修饰,有的没有。但都是在AQS内部使用的方法。
6.1 addWaiter
说明:
- 1 使用当前线程和给定模式创建节点;
- 2 新创建的节点加在队尾,成为新的tail;
返回:创建的新节点
参数mode:Node.EXCLUSIVE 或 Node.SHARED
private Node addWaiter(Node mode) {
// 创建节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// tail不为null,处理流程和enq(Node)中tail!=null时的处理流程一样。
node.prev = pred; //当前节点prev指向原来的tail
if (compareAndSetTail(pred, node)) { //将tail对象由原来的tail修改为当前新节点
pred.next = node; //原来的tail的next指向当前节点
return node;
}
}
// tail为null则调用enq方法。
enq(node);
return node;
}
6.2 enq
说明:
- 1 先判断tail是否为null;
- 2 如果
tail==null
:初始化head
,并tail = head
,然后重新循环; - 3 将新节点(参数node)加入到队尾;
返回:原来的tail(即新节点的前置节点)
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// tail为null,初始化head/tail。
if (compareAndSetHead(new Node()))
tail = head; //tail指向head
} else {
node.prev = t; //当前节点prev指向原来的tail
if (compareAndSetTail(t, node)) { // 将tail对象由原来的tail修改为当前新节点
t.next = node; //原来的tail的next指向当前节点
return t;
}
}
}
}
这里有个疑问,如果tail是null,但是head不是null,则tail不是一直不会被初始化了吗?
初始化head的方法是compareAndSetHead
,而根据compareAndSetHead
里的注释,只在enq
中调用:
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
所以:head和tail,是在enq
方法中同时初始化的。
addWaiter
和enq
在tail!=null
时处理流程都是一样的,都是先将当前节点prev指向原来的tail,然后调用compareAndSetTail
将新节点设置为tail,然后将原来的tail.next指向新节点。
6.3 shouldParkAfterFailedAcquire
说明:
- 1 该方法的功能是acquire失败后,判断node是否应该block;
- 2 参数pred必须是参数node的前置。
判断逻辑:
如果node的前置pred的waitStatus == Node.SIGNAL,那么node应该block;
否则做以下处理:
- 1 如果
pred.waitStatus == Node.CANCELLED
:说明前置节点已经取消。持续向前查询,直到找到一个waitStatus <= 0
的节点,作为node的新前置,并把CANCELLED的几点从队列中取消; - 2 如果pred.waitStatus是0或PROPAGATE(不会是CONDITION):因为0是初始状态,PROPAGATE是传播状态,所以前置如果是这两种状态,都应该将前置的状态修改为SIGNAL,但不应该park。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 当前节点(即pred的下一个节点)已经是等待状态,需要一个release去唤醒它。所以node可以安全的park.
return true;
if (ws > 0) { // CANCELLED
/*
waitStatus > 0 说明是canceled状态。
前驱是canceled状态,则表明前驱节点已经超时或者被中断,需要从同步队列中取消。
持续向前找,直到找到waitStatus<=0的节点,作为node的前驱.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
前驱节点waitStatus为 0 或 PROPAGATE,置换为SIGNAL.
不会是CONDITION,因为CONDITION用在ConditionObject中。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
6.4 unparkSuccessor
唤醒当前节点的后续节点。
处理流程:
- 1 如果节点不是CANCELLED,则将waitStatus设置为0;
- 2 如果后续节点是null或是CANCELLED状态,则对node之后第一个满足条件(waitStatus <= 0)的节点unpark。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // waitStatus设置为0
Node s = node.next;
if (s == null || s.waitStatus > 0) {
/*
node的后继节点是null或是CANCELLED状态,释放该后继节点;
并从tail开始向前找,找到离node最近的一个非CANCELLED(waitStatus <= 0)的节点,作为要unpark的节点。
*/
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// unpark后继节点。
if (s != null)
LockSupport.unpark(s.thread);
}
6.5 acquireQueued
说明:
- 1 独占模式下,已经在队列中的线程进行acquire;
- 2 在
acquire
方法或者ConditionObject.await*
方法中使用。
处理流程:
- 1 只有node的前驱是head时,才轮到node尝试获取锁,然后调用
setHead
将当前节点设置为head; - 2 否则检查node是否应该park(node的前置节点waitStatus是不是等于SIGNAL),是的话则park;然后等待其他线程对当前线程unpark。
返回:等待时是否中断过。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/*
只有前驱节点是head时,才轮到该节点争夺锁。
当前节点的前驱是head 并且 tryAcquire成功:
把当前要获取的节点设置为head,
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
如果当前节点不是head 或 tryAcquire失败,则检验当前节点的线程(此时即当前线程)是否应park。
parkAndCheckInterrupt会park当前线程,因此在这里block;将来有其他线程对当前线程unpark时,将继续此循环。
否则继续循环尝试acquire。
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
6.6 doAcquireShared
说明:
- 1 共享模式下,已经在队列中的线程进行acquire;
- 2 在
acquireShared
方法使用。
处理流程:
- 1 通过调用
addWaiter(Node.SHARED)
想队尾添加一个新的节点; - 3 只有node的前驱是head时,才轮到node尝试获取锁,然后调用
setHeadAndPropagate
将当前节点设置为head,并向后传播tryAcquireShared
返回的值; - 3 否则检查node是否应该park(node的前置节点waitStatus是不是等于SIGNAL),是的话则park;然后等待其他线程对当前线程unpark。
private void doAcquireShared(int arg) {
// 创建Node.SHARED模式的新节点(添加在队尾)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 循环
for (;;) {
/*
1 获取当前节点的前置节点;
2 前置节点是head,则tryAcquireShared,如果成功,则将当前节点设置为head,并将结果向后传播。
*/
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//设置head值,并将tryAcquireShared结果向后传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 前置节点不是head,当前线程是否park;
// 如果park当前线程,将来有其他线程对当前线程unpark时,将继续此循环。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued
和doAcquireShared
在判断处理节点时,流程都是一样的;不同点是:
acquireQueued
:使用setHead
设置头节点;doAcquireShared
:使用setHeadAndPropagate
设置头节点并传播tryAcquireShared
返回的值。
6.7 isOnSyncQueue
说明:
- 主要用于Condition流程中的判断,返回节点是否在同步队列中。
// Internal support methods for Conditions
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
// waitStatus是CONDITION,或prev是null
return false;
if (node.next != null)
// 根据文章前面对Node的描述,next只会在同步队列中有值。
return true;
/*
因为CAS操作替换值的时候可能会失败,所以有可能出现:
node.prev不为null,但是没在同步队列中。
所以需要从队尾向前再遍历一遍。
*/
return findNodeFromTail(node);
}
/**
从同步队列的tail开始向前遍历,查找是否有节点node。
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
7 public方法
public方法中,同步中需要调用acquire*/release*
相关方法,除此之外,其它一些public方法可能也会需要用到。
下面列出了除acquire*/acquire*
之外的public方法。
7.1 hasQueuedPredecessors
查询同步队列中,是否有比自己更优先的线程在等待。
比自己更优先:其实就是比自己先进入队列,排在自己前面的线程(因为更先进入队列,所以比自己等待时间更长)。
同步队列同时满足下面三个条件,则返回true:
- 1 head/tail都不为null,且head != tail;
- 2 head.next != null;
- 3 head.next.thread != currentThread。
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
7.2 hasQueuedThreads
是否还有等待的线程。判断很简单,head != tail
时,就认为还有等待的线程。
public final boolean hasQueuedThreads() {
return head != tail;
}
7.3 setExclusiveOwnerThread
设置当前拥有独占锁的线程,比如Thread.currentThread()。
一般在
tryAcquire
中compareAndSetState
成功后设置;或在tryRelease
中设置setExclusiveOwnerThread(null)
。
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
7.4 getExclusiveOwnerThread
返回
setExclusiveOwnerThread
中设置的当前拥有独占锁的线程。一般用于判断某线程(如当前线程)是否正在持有当前独占锁,如
if (current == getExclusiveOwnerThread()){....}
。
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
7.5 isQueued
判断线程thread是否在同步队列中
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
// 从tail向前查找
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
7.6 getExclusiveQueuedThreads
返回独占模式的队列中等待的线程
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
7.7 getSharedQueuedThreads
返回共享模式的队列中等待的线程
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
8 独占模式同步
独占模式,需要子类的内部类,实现tryAcquire
/tryRelease
,并在lock和unlock时调用acquire/release
,如ReentrantLock.FairSync
。
8.1 acquire
- 1 tryAcquire(arg) 是否成功
- 2 不成功就调用addWaiter创建Node,并调用acquireQueued加入节点
- 3 acquireQueued返回true,表示中断过,则调用selfInterrupt。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
8.2 release
- 1 执行
tryRelease
,成功则继续执行,失败则返回false; - 2 head不为空且
nextWaiter!=0
(0是同步队列中的节点的nextWaiter的初始值),则执行unparkSuccessor
唤醒后继者,然后返回true。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//unpark节点head的后继者
return true;
}
return false;
}
9 共享模式同步
共享模式,需要子类的内部类,实现tryAcquireShared
/tryReleaseShared
,并在lock和unlock时调用acquireShared/tryReleaseShared
,如CountDownLatch.Sync
。
9.1 acquireShared
tryAcquireShared失败,则执行doAcquireShared。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
9.2 releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // head != tail
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // head.waitStatus == Node.SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // 如果将head的waitStatus由SIGNAL置换为0失败,则继续循环。
unparkSuccessor(h); // unpark头节点的后续节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 如果head的waitStatus由0置换为PROPAGATE失败,则继续循环。
}
if (h == head) // 如果head没有改变,则退出;否则继续循环。
break;
}
}
10 总结
- 1 AQS使用一个Node链表来实现线程的排队执行,即实现同步的功能;
- 2 对线程的控制,都是使用
LockSupport.park*/LockSupport.unpark*
实现; - 3 用
shouldParkAfterFailedAcquire
来判断是否park,用LockSupport.park*
来实现自旋的目的,如acquireQueued
; - 4 使用
unparkSuccessor
唤醒后继节点以便让后继节点执行,如release
中unparkSuccessor(head)
; - 5 只有前驱节点是head的节点,才能去争夺锁,否则park;
- 6 总的流程,就是添加线程节点到队尾,然后park该线程,等该节点成为head的后继节点时再唤醒,以达到同步器的目的。