阅读 460

06 ReentrantLock之Condition

1 Condition的基本使用

在前面学习synchronized 的时候,有讲到 wait/notify的基本使用,结合 synchronized可以实现对线程间的通信,JUC并发包里面提供的同样的功能。

Condition 是一个多线程协调通信的工具类里面提供了await/signal方法,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。

1.1 简单案例

public class LockConditionDemo {

    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        Thread threadA = new Thread(() -> {
            System.out.println("threadA start");
            lock.lock();
            System.out.println("threadA getLock Running");
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock.unlock();
            System.out.println("threadA end");
        });

        Thread threadB = new Thread(() -> {
            System.out.println("threadB start");
            lock.lock();
            System.out.println("threadB getLock Running");
            condition.signal();
            lock.unlock();
            System.out.println("threadB end");
        });

        threadA.start();
        TimeUnit.SECONDS.sleep(2);
        threadB.start();
    }
}
复制代码

输出结果如下:

threadA start
threadA getLock Running
threadB start
threadB getLock Running
threadB end
threadA end
复制代码

1.2 简单总结

当调用await方法后,当前线程会释放锁并等待,而其他线程调用condition 对象的 signal 或者 signalall 方法通知被阻塞的线程,然后自己执行 unlock 释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁。

所以,condition 中两个最重要的方法,一个是 await,一个是 signal 方法await:把当前线程阻塞挂起signal:唤醒阻塞的线程

2 Condition基本介绍

2.1 具体类图的UML

image-20200921003258059

关于Condition 的实现类为 AbstractQueuedSynchronizer.ConditionObject 内部类。

先看下condition接口提供哪些定义方法

public interface Condition {

    void await() throws InterruptedException;

    void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean await(long time, TimeUnit unit) throws InterruptedException;

    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();

    void signalAll();
}
复制代码

ConditionObject的关键数据结构

private transient Node fristWaiter;

private transient Node lastWaiter;
复制代码

每个ConditionObject,都维护着自己的条件等待队列。

2.2 等待

源码分析之前,先介绍下等待队列。

等待队列是一个FIFO的队列, 在队列中的每个节点都包含了一个线程引用, 该线程就是在Condition对象上等待的线程, 如果一个线程调用了Condition.await() 方法, 那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。一个Condtion包含一个等待队列, Condtion拥有首节点(fristWaiter) 和尾节点(llastWaiter) 。当前线程调用Condition.await()方法, 将会以当前线程构造节点, 并将节点从尾部加入等待队列。等待队列的基本结构如下图所示:

image-20200923235044450

一个Condition包含一个等待队列, Condition拥有首节点(firstWaiter) 和尾节点(lastWaiter) 。当前线程调用Condition.await()方法, 将会以当前线程构造节点, 并将节点从尾部加入等待队列。

2.2.1 await

调用condition.await()方法会使当前线程进入等待队列并释放锁,同时线程变为等待状态。当从await()方法返回时,一定是获得与condition相关联的锁。

当调用condition.await()方法时,同步队列的首节点(获得锁的节点)移动到condition的等待队列。

调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal ()方法唤醒, 而是对等待线程进行中断, 则会抛出InterruptedException。如果从队列的角度去看, 当前线程加入Condition的等待队列, 该过程如下图所示, 同步队列的首节点并不会直接加入等待队列, 而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入等待队列中。

image-20200922234438856

public final void await() throws InterruptedException {
    // 检测当前线程的中断标记,如果中断位为1,则抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加等待节点。就是一个简单的链表维护节点的操作,具体参照addConditionWaiter讲解
    Node node = addConditionWaiter();
    // 释放占有的锁,并获取当前锁的state。并唤醒同步队列中的一个线程
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 当前节点是否在同步队列中,如果在同步阻塞队列中,则申请锁,去执行;
    // 同步队列中的情况—并发:A线程await执行到此处释放锁。B线程singal后(等待节点加入同步节点),
    // 再执行下面的代码。acquireQueued可能失败(再次park即可),也可能成功(B释放lock锁在此之前也执行了)
    // 如果不在同步队列中(在等待队列中),阻塞,等待满足条件
    while (!isOnSyncQueue(node)) {
        // park阻塞,等待满足条件
        LockSupport.park(this);
        // 线程从条件等待被唤醒后,线程要从条件队列移除,进入到同步等待队列。
        // 被唤醒有有如下两种情况,一是条件满足,收到singal信号,二是线程被取消(中断),
        // 如果是被中断,需要根据不同模式,处理中断。处理中断,也有两种方式:1.继续设置中断位;2:直接抛出InterruptedException
        // 1、先singal,再interrupt再,重新中断
        // 2、先interrupt再singal,直接抛出异常
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            // 没有中断,由于从等待队列加入到同步队列。下次循环也会跳出此while
            break;
    }
    // singal唤醒后尝试去申请锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 处理取消等待的节点,从等待队列中移除
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    // 根据中断标识来进行中断处理
    // 如果是 THROW_IE,则抛出中断异常
    // 如果是 REINTERRUPT,则重新响应中断
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
复制代码

以上是await的整体方法,大致说明下流程。接下来会对每一个方法进行解析。

2.2.1.1 addConditionWaiter

添加条件等待节点

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果最后一个等待节点的状态不是Node.CONDITION,则先删除等待队列中节点状态不为Node.CONDITION的节点。
    // 有可能被中断或者等待await到期
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 等待队列中移除取消等待的节点
        unlinkCancelledWaiters();
        // 清除不是Node.CONDITION的节点,尾结点有可能也会相应的改变
        t = lastWaiter;
    }
    // 当前线程构造节点并加入等待队列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 构造首节点
    if (t == null)
        firstWaiter = node;
    // 把当前等待线程构造为node加到尾结点后面,如之前的结构图所示
    else
        t.nextWaiter = node;
    // 改变尾结点
    lastWaiter = node;
    return node;
}
复制代码

2.2.1.2 unlinkCancelledWaiters

向等待队列中的LastWaiter加入节点时,LastWaiter不是CONDITION状态,从头遍历等待队列中移除取消等待的节点。

private void unlinkCancelledWaiters() {
    // 首节点
    Node t = firstWaiter;
    // 从头结点往后遍历移除非Node.CONDITION节点
    // 若当前结点不被移除,就用trail临时变量赋值,与下次不被取消的节点形成链表
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        // 移除队列中不为Node.CONDITION的节点
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}
复制代码

2.2.1.3 fullyRelease

彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。

释放锁,会唤醒同步队列中的Head节点的下一个节点获得锁。此时同步队列也会变化

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
复制代码

2.2.1.4 isOnSyncQueue

如果不在同步队列,说明当前节点没有唤醒去争抢同步锁,所以需要把当前线程阻塞起来,直到其他的线程调用signal 唤醒

如果在同步队列,意味着它需要去竞争同步锁去获得执行程序执行权限。

等待队列中的节点会重新加入到 同步队列去竞争锁。也就是当调用 signal的时候,会把当前节点从 等待队列转移到同步 队列

final boolean isOnSyncQueue(Node node) {
    // 节点的状态为Node.CONDITION 或 node.prev == null,表明该节点在等待队列中,并没有加入同步队列。
    // 一个是根据waitStatus判断,一个根据队列的数据结构(同步对列为双向链表,等待队列为单向链表)。
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 同步队列有pre与next(双向链表)
    // 等待队列为nextWaiter(单向链表)
    if (node.next != null) 
        return true;
    // node.prev不为空,但也不在同步队列中,这个是由于CAS可能会失败(同步队列是先链接Pre节点,在CAS next节点)。
    // 为了不丢失信号,从同步队列中再次选择该节点,如果找到则返回true,否则返回false
    return findNodeFromTail(node);
}
复制代码
// 从尾结点向前遍历,找到node节点
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}
复制代码

2.2.1.5 checkInterruptWhileWaiting

前面在分析 await 方法时,线程会被阻塞。而通过 signal被唤醒之后又继续回到上次执行的逻辑中。

检查await被唤醒时,线程有没有被Interrupt。

如果当前线程被中断,则调用transferAfterCancelledWait 方法判断后续的处理应该是抛出 InterruptedException 还是重新中断。

  • 先singal,再interrupt再,重新中断
  • 先interrupt,再singal,直接抛出异常
private int checkInterruptWhileWaiting(Node node) {
    // 线程是否中断,会清除中断标识位。根据判断是否需要重新中断
    return Thread.interrupted() ?
        // 判断当前结点有没有加入同步队列
        // 1、已经加入同步队列中返回false,重新中断(中断在singal之后)
        // 2、没有加入同步队列中,返回true,抛出异常(中断在singal之前)
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
复制代码

2.2.1.6 transferAfterCancelledWait

final boolean transferAfterCancelledWait(Node node) {
    // CAS成功,当前结点还未从等待队列加入同步队列
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 把当前结点从等待队列移除并加入到同步队列
        enq(node);
        return true;
    }
    //如果cas失败,则判断当前node是否已经在同步队列上,如果不在,则让给其他线程执行
        //当node被触发了signal方法时,node就会被加到同步队列上
    //循环检测node是否已经成功添加到同步队列中。如果没有,则通过yield让出线程的cpu调度
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}
复制代码

2.2.1.7 reportInterruptAfterWait

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    // 直接抛出异常
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    // 重新中断,线程自己处理
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

static void selfInterrupt() {
     Thread.currentThread().interrupt();
}
复制代码

2.3 通知

调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点firstWaiter) , 在唤醒节点之前, 会将节点移到同步队列中。

2.3.1 signal

public final void signal() {
    // 如果当前线程不是锁的持有者,直接抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    // 通知等待队列中的第一个等待者
    if (first != null)
        doSignal(first);
}
复制代码

2.3.2 doSignal

本方法的主要功能是:被通知的节点从等待队列中移除,并把被通知的节点加入同步队列中。

private void doSignal(Node first) {
    do {
        // 首先将要被通知节点的下一个节点设置为等待队列的firstWaiter
        // 如果当前被通知节点的下一个节点为空
        if ( (firstWaiter = first.nextWaiter) == null)
            // 等待队列的尾节点(lastWaiter)设置为空
            lastWaiter = null;
        // 当前被通知节点的下一个节点设为空,旧的firstWaiter从等待队列中移除
        first.nextWaiter = null;
    } 
    // 如果被通知节点没有进入到同步队列并且等待队列还有不为空的节点,则继续循环通知
    while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
复制代码

2.3.3 transferForSignal

把等待队列中的节点改变WaitStatus后加入同步队列

final boolean transferForSignal(Node node) {
    // 尝试将节点状态设置为0,如果设置失败,则说明该节点的状态已经不是Node.CONDITION,进一步说明该节点在没有等到通知信号时,被取消.
    // 直接返回false,通知下一个等待者。回到上面的while循环
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 将节点放入到同步队列中。主要是将节点从等待队列移入到同步队列中
    // p为同步队列的当前加入节点的上个节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // 上个节点被取消,唤醒刚刚加入aqs队列的节点
    // 设置前置节点状态为Node.SIGNAL状态失败时,唤醒被通知节点代表的线程。
    // 此时不唤醒,只能通过lock.unLock释放锁。唤醒当前结点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
复制代码

3 总结

线程 awaitThread 先通过 lock.lock()方法获取锁成功后调用 condition.await 方法进入等待队列,而另一个线程 signalThread 通过 lock.lock()方法获取锁成功后调用了 condition.signal 或者 signalAll 方法,使得线程awaitThread能够有机会移入到同步队列中,当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取lock,从而使得线程 awaitThread 能够从 await 方法中退出执行后续操作。如果 awaitThread 获取 lock 失败会直接进入到同步队列。

image-20200923233902398

image-20200923234806591