LockSupport
作用:唤醒和阻塞线程,用来构建同步工具
- 以park开头的方法为阻塞线程
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
- unpack(Thread thread)唤醒线程
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
AbstractQueuedSynchronizer
AQS使用的设计模式
模板方法(Template Method)
- **AblstractClass(抽象类):**在抽象类中定义了一系列的操作PrimitiveOperation,每个操作可以使具体的,也可以是抽象的,每个操作对应一个算法的步骤,在子类中可以重新定义或实现这些步骤。TmplateMethod()这个方法用于定义一个算法结构,模板方法不仅可以调用在抽象类中实现的基本方法,也可以调用在抽象类的子类中实现的基本方法,还可以调用其他对象中的方法。
- **ConcreteClass(具体子类):**用于实现在父类中声明的抽象基本操作,也可以覆盖在父类中已经实现的具体基本操作。
下面我们可以手写一个模板方法的设计模式
定义一个抽象类
public abstract class SendCustom {
public abstract void from();
public abstract void to();
public abstract void content();
public abstract void send();
public void date() {
System.out.println(new Date());
}
public void sendMessage() {
from();
to();
content();
date();
send();
}
}
定义一个继承类
public class SendSms extends SendCustom {
@Override
public void from() {
System.out.println("from");
}
@Override
public void to() {
System.out.println("to");
}
@Override
public void content() {
System.out.println("content");
}
@Override
public void send() {
System.out.println("send");
}
public static void main(String[] args) {
SendCustom custom = new SendSms();
custom.sendMessage();
}
}
输出如下
from
to
content
Fri Apr 17 09:28:07 CST 2020
send
AQS中的方法
模板方法
-
独占式获取
- aquire
- aquireInterruptly
- tryAquireNanos
-
共享式获取
- aquireShared
- aquireSharedInterruptly
- tryAquireSharedNanos
-
独占式释放
- release
-
共享式释放
- releaseShared
需要覆盖的流程方法
- 独占式获取:tryAcquire
- 独占式释放:tryRelease
- 共享式获取:tryAcquireShared
- 共享式释放:tryReleaseShared
- 这个同步器是否处于独占模式:isHeldExlusively
同步状态:state
- 获取:getState
- 设置:
- setState
- compareAndSetState,使用CAS保证原子性
设置一个类似ReentrantLock的类
public class SelfLock implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean isHeldExclusively() {
return 1 == getState();
}
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (0 == getState()) {
throw new UnsupportedOperationException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition newCond() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCond();
}
}
编写一个测试类
public class TestMyLock {
public void test() {
final Lock lock = new SelfLock();
class Worker extends Thread {
@Override
public void run() {
while (true) {
lock.lock();
try {
SleepTools.second(1);
System.out.println(Thread.currentThread().getName());
SleepTools.second(1);
} finally {
lock.unlock();
}
SleepTools.second(2);
}
}
}
for (int i = 0; i < 10; i++) {
Worker worker = new Worker();
worker.setDaemon(true);
worker.start();
}
for (int i = 0; i < 10; i++) {
SleepTools.second(1);
System.out.println();
}
}
public static void main(String[] args) {
TestMyLock testMyLock = new TestMyLock();
testMyLock.test();
}
}
输出如下
Thread-0
Thread-1
Thread-2
Thread-3
Thread-4
AQS中的数据结构-节点和同步队列
如上图所示,AQS是一个双向的链表,每个节点由「前驱」,「后继」指针,还有每个节点包括了一个线程的信息,,采用的是「先进先出」的方式。
竞争失败的线程会打包成Node放到同步队列,Node可能的状态里:
-
CANCELLED:线程等待超时或者被中断了,需要从队列中移走
-
SIGNAL:后续的节点等待状态,当前节点,通知后面的节点去运行
-
CONDITION :当前节点处于等待队列
-
PROPAGATE:共享,表示状态要往后面的节点传播
-
0, 表示初始状态
节点加入队列
队列释放首节点
以下为独占式同步状态的获取与释放
结合源码如下
获取锁
//1 获取锁
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//2获取失败生成节点,并加入队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//若前驱不为空 CAS加入对列,并返回
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//若前驱为空 ,加入对类返回节点
enq(node);
return node;
}
//加入队列,返回节点
//自旋检查,先判断是否存在头节点,若存在加入队列返回节点,若不存在,初始化头节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//3.自旋判断前驱节点并且能获取到锁,若都能将此节点设为头节点,让后将此节点的后继脱落队列,返回中断为false
//若不能,看第4步,
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//4.获取节点的状态,并且修改节点状态,然后阻塞线程,返回中断为true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
释放锁
//能否释放线程,若能判断头节点不为空且头节点状态不为0,就唤醒头节点
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
共享式同步状态获取与释放
相比于独占式多了一个setHeadAndPropagate(node, r);
的方法,作用是偏移,类似于SemaPhore,若共享线程有10个
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
独占式超时同步状态获取
即在独占模式上面增加了一个等待超时模式
/**
* Attempts to acquire in exclusive mode, aborting if interrupted,
* and failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquire}, returning on success. Otherwise, the thread is
* queued, possibly repeatedly blocking and unblocking, invoking
* {@link #tryAcquire} until success or the thread is interrupted
* or the timeout elapses. This method can be used to implement
* method {@link Lock#tryLock(long, TimeUnit)}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
实现一个奇葩点的三元共享同步工具类
public class TrinityLck implements Lock {
private Sync sync = new Sync(3);
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryReleaseShared(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return sync.newCond();
}
private static class Sync extends AbstractQueuedSynchronizer{
Sync(int count){
if(count<=0){
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
@Override
protected int tryAcquireShared(int reduceCount) {
for (;;){
int current = getState();
int newCount = current-reduceCount;
if(newCount < 0 || compareAndSetState(current, newCount)){
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
Condition newCond(){
return new ConditionObject();
}
}
}
Condition
一个Condition包含一个等待队列,是一个单项列表
同步队列与等待队列
一个AQS对应了多个Condition,所以signal不多唤醒其他Condition的阻塞线程
await方法
当线程获取到锁的时候,等待Condition,若条件未满足,则吧该节点构造新的Condition节点加入Condition
signal方法
当线程获取到锁的时候,调用了一个Condition的singal,则吧该Condition队列中的一个节点放入AQS队列等待执行。
锁的可重入
我们刚刚写了一个独占式的锁,里面的额state就只只有0,1这样在写递归的时候,若线程a获得锁为释放的时候又要获取锁,理论上就会等待,而线程a必须在第二次获取到锁的时候才能继续执行,这样就死锁了,所以我嗯采取的是在state上面进行累加,而不是直接改变状态,进而记录获取锁的次数,若释放一次就减一次,类似于SemaPore。
非公平锁的实现
我们的公平锁获取锁的第一步是队列是否有尾节点,若存在,继续执行,而非公平锁则是第一步就直接去拿锁,拿不到在做其他的操作,这样非公平锁的效率跟高
ReentrantWriteReadLock的实现
在ReentrantWriteLReadLock中定义了两把锁,但是只有一个Sync对象,那么他的State是怎么记录的呢