Condition的await/signal源码实现简析

1,978 阅读5分钟

1 简介

Condition是用于当线程的等待/唤醒操作,实现是AbstractQueuedSynchronizer.ConditionObject,示例代码:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

lock.lock();
try {
   condition.await();
   // operations
   condition.signal();
}finally {
   lock.unlock();
}

注意:

  • Condition对象通过Lock.newCondition()创建;
  • 必须在Lock.lock()后才能进行Condition.await操作,原因文章后面会讲。

Condition基于AQS实现,AQS可以参考AQS源码分析及核心方法解析

2 属性

    private transient Node firstWaiter; // 等待队列的第一个节点
        
    private transient Node lastWaiter; // 等待队列的最后一个节点

Condition的Node,只有CONDITION/CANCELLED两种状态。

3 内部方法

3.1 addConditionWaiter

功能:使用 当前线程waitStatus值Node.CONDITION 构造新节点,并添加到条件队列的队尾。

返回:添加的新节点

流程

  • 1 如果lastWaiter==null,说明队列未初始化,则firstWaiter = node
  • 2 如果原来的lastWaiter!=null,则原来的lastWaiter.nextWaiter = node
  • 3 最后,lastWaiter = node,将lastWaiter字段指向node。
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            /* 
            * 如果lastWaiter不是CONDITION状态,则清除掉条件队列中所有CANCELLED状态的节点。
            * 清理后lastWaiter会变。
            */
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            
            // 新建节点,节点初始waitStatus是CONDITION
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            
            /* 
            t指向了当前的lastWaiter
            如果 t == null 的话,则firstWaiter指向新节点;
            否则把 t.nextWaiter指向新节点,也就是把新节点加都队尾。
            */
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            
            // 最后把属性lastWaiter指向新节点node
            lastWaiter = node;
            return node;
        }

3.2 unlinkCancelledWaiters

功能:清除掉条件队列中所有被取消的节点;

流程

  • 1 从firstWaiter开始,将整个链表中t.waitStatus != Node.CONDITION的节点移除掉;
  • 2 节点移除后,将其前置节点的nextWaiter指向后置节点。
 private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
            
                if (t.waitStatus != Node.CONDITION) {
                
                /*
                *当前节点状态不是CONDITION:
                *当前节点的nextWaiter设置为null。
                * 如果trail是空,则将firstWaiter指向之前保存的t.nextWaiter,
                * 否则将trail.nextWaiter指向之前保存的t.nextWaiter。
                */
                
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

3.3 fullyRelease

功能:用当前的当前state作为参数,调用release(目的是释放锁)。

返回:释放之前的state的值。

流程

  • 1 通过getState()获取当前state;
  • 2 执行release(int)
  • 3 成功则返回释放之前的state值,失败则node.waitStatus = Node.CANCELLED
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

3.4 isOnSyncQueue

功能:判断节点是否在同步队列中。

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            // waitStatus是CONDITION,或prev是null
            return false;
        if (node.next != null) 
            // 根据文章前面对Node的描述,next只会在同步队列中有值。
            return true;
        /*
           因为CAS操作替换值的时候可能会失败,所以有可能出现:
           node.prev不为null,但是没在同步队列中。
           
           所以需要从队尾向前再遍历一遍。
        */
        return findNodeFromTail(node);
    }

    /**
     从同步队列的tail开始向前遍历,查找是否有节点node。
     */
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

3.5 transferForSignal

功能:将一个Condition队列节点,转成同步队列节点,然后加入到同步队列中,最后将该节点在同步队列中的前置的waitStatusCAS为Node.SIGNAL

返回:参数node是取消状态(waitStatus != Node.CONDITION)返回false,否则返回true。

流程

  • 1 先把节点的waitStatus从Node.CONDITION设置为0,设置失败则返回false;
  • 2 调用 enq 方法将该节点加入同步队列的队尾,并返回该节点在同步队列中的前置节点
  • 3 如果前置节点的waitStatus是CANCELLED状态,则唤醒节点node所持线程;
  • 4 否则,对前置节点waitStatusCAS为Node.SIGNAL,失败则唤醒节点node所持线程。

    final boolean transferForSignal(Node node) {
        
        // 将node.waitStatus通过CAS由Node.CONDITION置换为0,失败则返回false。
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
        node加入同步队列;
        返回值p是同步队列的原队尾,即现在node在同步队列中的前置节点。
        */
        Node p = enq(node);
        int ws = p.waitStatus;
        
        /*
        ws是node在同步队列中的前置节点的waitStatus值;
        ws > 0表示p是CANCELLED状态;
        ws <= 0时执行CAS,将p的状态置换为Node.SIGNAL。
        
        如果p是CANCELLED状态,或 CAS为Node.SIGNAL失败,则unpark当前节点的线程。
        */
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

3.6 doSignal

功能:通过调用transferForSignal:将参数first转换成同步队列节点(waitStatus==0),加入到同步队列中,同时将其在同步队列中的前置节点waitStatus置为Node.SIGNAL

流程

  • 1 将参数first的nextWaiter赋给firstWaiter备用;
  • 2 调用 transferForSignal(first)
  • 3 如果 transferForSignal成功 则结束;否则将之前firstWaiter(指向之前的first.nextWaiter)赋给first重新循环。
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

4 await

4.1 执行过程

  • 1 执行addConditionWaiter()创建新的CONDITION状态节点,加入条件队列队尾,并返回该节点;
  • 2 执行fullyRelease释放掉当前state的值(即置为0),并将释放前的state值保存到savedState;失败的话,新节点waitStatus置为CANCELLED;
  • 3 通过isOnSyncQueue,判断是否在同步队列中,不在则执行park
  • 4 被其他线程unpark后,用之前保存savedState的值进行acquireQueued(node, savedState)
 
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            
            // 添加新节点到队尾
            Node node = addConditionWaiter();
            // 释放state
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            
            // 节点不在同步队列中
            while (!isOnSyncQueue(node)) {
                
                //park
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

4.2 Lock.lock后才能await?

Condition.await调用的fullyRelease中,会调用release,进而会调用tryRelease,ReentrantLock中tryRelease会进行如下判断:

if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();

而在执行Lock.lock()之前,getExclusiveOwnerThread()返回null,所以:

执行Lock.lock之后才能执行Condition.await

5 signal

  • 判断当前线程是否独占了锁,是则执行doSignal
  • isHeldExclusivelyReentrantLock中的实现是return getExclusiveOwnerThread() == Thread.currentThread();
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }