Lock锁源码分析

阅读 243
收藏 12
2018-07-19
原文链接:my.oschina.net

前言

在java SE 5之前,是使用  synchronized 来进行控制多线程访问共享资源的,使用 synchronized 修改方法或代码块之后,会隐式的获取到锁,之后方法退出时在隐式的释放锁,而java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。Lock锁拥有了锁获取与释放的可操作性、公平锁与非公平锁,可中断的获取锁以及超时获 取锁等多种synchronized关键 字所不具 备的同步特性。

例子

使用 Lock来进行代码同步非常的简单,只需在加锁的地方调用 lock 方法进行加锁,在解锁的地方调用 unlock 来释放锁,一般在 finally 中调用,以保证锁能够正确的被释放。如下所示:

    private static Lock lock = new ReentrantLock();

    public void testLcok() {
        new Thread(() -> {
            lock.lock(); // 加锁
            try {
                for (int j = 1; j <= 3; j++) {
                    System.out.println("A = " + j);
                }
            } catch (InterruptedException e) {
                    e.printStackTrace();
            }finally {
                lock.unlock(); // 解锁
            }
        }).start();
    }

不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,也会导致锁无故释放。

分析源码

Lock是一个接口,定义了一系列的方法,由子类取实现,如 ReentrantLock,

public interface Lock {
    // 获取锁,调用该方法当前线程获取锁,当获得锁后,从该方法返回
    void lock();
    // 可中断的获取锁,在锁的获取中可以中断当前线程
    void lockInterruptibly() throws InterruptedException;
    // 尝试非阻塞的获取锁,调用该方法后立即放回,如果能获取到锁则放回 ture, 否则返回false
    boolean tryLock();
    // 超时获取锁,当前线程在下面三种情况下放回:
    // 1.在超时之前获取到锁
    // 2.在超时之前被中断
    // 3.超时时间到,则放回false
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    // 释放锁
    void unlock();
    // 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得锁,才能调用该组件的await()方法,调用后,当前线程释放锁
    Condition newCondition();
}

Lock极其子类是通过一个同步器来进行线程访问控制的,而同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态。

先来看看 Lock 中的同步器 AbstractOwnableSynchronizer 的一个类图:

同步器 AbstractQueuedSynchronizer 继承于 AbstractOwnableSynchronizer,而AbstractOwnableSynchronizer  类中只有一个属性及其setter/getter 方法,表示当前线程。

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

接下来看看同步器 AbstractQueuedSynchronizer 的实现,AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

可以这样来立即锁和同步器的关系:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。

同步器 AbstractQueuedSynchronizer 源码

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    // 队列中的节点,用它来表示队列,是一个双向列表的形式
    static final class Node {
        // 共享模式
        static final Node SHARED = new Node();
        // 独占模式
        static final Node EXCLUSIVE = null;
        // 等待状态值-由于在等待队列中等待的线程等待超时或被中断,需要从等待队列中取消等待,节点进入该状态不会变化
        static final int CANCELLED =  1;
        // 等待状态值-后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或被中断,在通知后继节点,后继节点得以运行
        static final int SIGNAL    = -1;
        // 等待状态值-节点在等待队列中,节点线程等待在 Condition 上,当其他线程到 Condition 调用了 signal 方法后,该节点将从等待队列中转移到同步队列中,加入到对同步状态的获取中
        static final int CONDITION = -2;
        // 等待状态值-表示下一次共享式同步状态获取将被传播下去
        static final int PROPAGATE = -3;
        // 等待状态,只能是上面的值或0
        volatile int waitStatus;
        // 前驱节点
        volatile Node prev;
        // 后继节点
        volatile Node next;
        // 获取同步状态的线程
        volatile Thread thread;
        // 等待队列中的后继节点
        Node nextWaiter;
        // 判断当前节点是否是共享模式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        // 返回上一个节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    }

    // 等待队列的头节点
    private transient volatile Node head;

    // 等待队列的尾节点
    private transient volatile Node tail;

    // 同步器的状态
    private volatile int state;

    // 返回同步器的状态
    protected final int getState() {
        return state;
    }

    // 设置当前同步器状态
    protected final void setState(int newState) {
        state = newState;
    }

    // 从用 CAS 的方式了设置当前同步器的状态,保证设置的原子性
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    // 向队列中插入节点,通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 因为是双向队列,tail为null,表示列表为空,
                if (compareAndSetHead(new Node())) // 采用 CAS 的方式设置头节点
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) { // 从用CAS的方式把节点添加到列表的最后
                    t.next = node;
                    return t;
                }
            }
        }
    }

    //根据模式来插入节点
    private Node addWaiter(Node mode) {
        //创建当前线程的一个节点
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        // 队列不为空,则把当前线程的节点CAS的方式插入到队列尾部
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 否则使用 enq() 方法插入
        enq(node);
        return node;
    }

    //设置队列的头结点
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    // 唤醒当前节点的后继节点
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 唤醒后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // LockSupport 是一个工具类,拥有一系列的静态方法来阻塞或唤醒线程,以 park 开头的为阻塞,unpark开头的为唤醒
            LockSupport.unpark(s.thread);
    }

    //共享式释放同步状态,会唤醒后继线程并传播下去
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { // 如果当前线程状态为SIGNAL,表示释放同步状态会通知后继节点
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 把状态设置为0,表示释放同步状态
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);// 唤醒后继节点
                }
                // 如果当前线程没有获取到同步状态,则把状态设置为可传播状态
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    //当前线程在获取同步状态失败后是否应该被阻塞
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    // 阻塞当前线程
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    // 主要作用是把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞,但阻塞前又通过tryAccquire重试是否能获得锁,如果重试成功能则无需阻塞,直接返回
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,获取到同步状态后就可以退出,为什么只有头节点才能获取同步状态,因为头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点,维护FIFO原则
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { 再次尝试获取同步状态,成功则之间返回
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                } 
                // 再次获取同步状态失败后,判断当前线程是否应该被阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 如果是则阻塞当前线程 
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // 可中断的获取同步状态,在独占模式下使用
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
        // 当前线程节点
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { // 尝试取获取同步状态,成功则返回
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                // 获取同步状态失败,则判断是否需要阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 阻塞当前线程
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    //共享式获取同步状态
    private void doAcquireShared(int arg) {
        // 当前线程
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) { // 当大于等于0时,表示能够获取到同步状态
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // 尝试获取同步状态,模版方法,由子类实现
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    // 尝试释放同步状态,模版方法,由子类实现
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    // 尝试获取同步状态,共享模式,模版方法,由子类实现
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 尝试释放同步状态,共享模式,模版方法,由子类实现
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出
    public final void acquire(int arg) {
        // 保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式)并通过addWaiter()方法将该节点加入到同步队列的尾部,最后调用acquireQueued()方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    // 获取同步状态,可响应中断
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    // 唤醒节点线程,该方法执行时,会唤醒头节点的后继节点线程
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); // 唤醒处于等待状态的线程
            return true;
        }
        return false;
    }
 /*
独占式同步状态获取(acquire)和释放(release)过程总结,在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步
器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
   */    

    // 共享式获取同步状态
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) // 返回值大于等于0时,表示能够获取到同步状态
            doAcquireShared(arg);
    }
    // 共享式获取同步状态, 响应中断
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    // 共享式释放同步状态
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
}


通过以上的源码分析可知,同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再
次尝试获取同步状态。

同步队列的基本结构如下图所示:

当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

节点加入同步队列的过程:

在了解了同步器的实现原理后,接下来再看下 Lock 的实现原理

重入锁ReentrantLock实现原理

重入锁ReentrantLock,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。

先看看ReentrantLock的一个类图:

可以看到 ReentrantLock 实现了 Lock 接口,Sync是它的一个内部类,它实现了同步器AbstractQueuedSynchronizer,而 FairSync 又是它的子类,代表公平锁,NonfairSync也是它的子类,代表非公平锁,接下来看看它的一个实现:

public class ReentrantLock implements Lock, java.io.Serializable {
    // 同步器
    private final Sync sync;
  
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        // 由子类实现
        abstract void lock();
        // 以非公平的方式来获取锁,锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { // 如果当前线程还没有获取锁,则获取
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果当前线程已经获得锁且当前线程为占据锁的那个线程,则将同步状态值进行增加并返回true,表示获取同步状态成功,即可重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires; // 状态值增加
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        // 释放锁,如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
        // 判断当前线程是否和获得锁的线程相等
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    // 非公平锁
    static final class NonfairSync extends Sync {
        // lock
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
         // 尝试获取锁
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    // 公平锁
    static final class FairSync extends Sync {
        
        final void lock() {
            acquire(1); // 调用 tryAcquire() 和 acquireQueued()
        }
        // 尝试获取锁
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // hasQueuedPredecessors 方法表示加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
    // 默认为非公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    // 指定构造公平或非公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    // 向外提供的接口,获取锁
    public void lock() {
        sync.lock();
    }
    // 向外提供的接口,获取锁,可以响应中断
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    // 尝试以非公平的方法获取锁,获取到则返回true,否则为false
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    // 超时获取锁,如果在规定的时间内获取不到锁,则返回false
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    // 释放锁
    public void unlock() {
        sync.release(1);
    }
}

Condition接口

任意一个Java对象,都拥有一组监视器方法,主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式。Condition对于 synchronized来说具有更好的灵活性,比如可以实现多路通知功能,也就是说在一个Lock对象上可以有多个 Condition 实例,线程对象可以注册到不同的 Condition 实例中,从而可以有选择性的对线程进行通知。

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象创建出来的,换句话说,Condition是依赖Lock对象的。
下面看一个例子:

    private ReentrantLock lock = new ReentrantLock();

    public Condition conditionA = lock.newCondition();

    public void awaitA() throws InterruptedException {
        if(lock.tryLock()) {
            System.out.println( System.currentTimeMillis() + ", threadName : "));
            conditionA.await();
            lock.unlock();
        }else{

        }
    }

当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

Condition的实现分析

ConditionObject是同步器AbstractQueuedSynchronizer的内部类,每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

1.等待队列

等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态,节点的定义使用的是同步器中定义的节点,及Node类。

    public class ConditionObject implements Condition, java.io.Serializable {
        private transient Node firstWaiter;
        private transient Node lastWaiter;
      .................
     }

一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如下:

Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可

2.等待

调用Condition的await()方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 当前线程加入等待队列
            Node node = addConditionWaiter();
            // 释放同步状态,也就是释放锁,唤醒后继节点,
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) { // 当前线程已在同步队列中,退出
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 把当前线程加入到队列中
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) { // 释放锁
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }


3.通知

调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中
 

        public final void signal() {
            if (!isHeldExclusively()) // 当前线程必须获得锁
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
    final boolean transferForSignal(Node node) {

        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

 

评论