Java并发4:锁

244 阅读20分钟

Lock接口

在Lock接口出现之前,Java程序依靠 synchronized 关键字实现锁的功能。锁提供了类似的同步功能,只是在使用时需要显式获取和释放锁,同时还拥有了锁的获取释放的操作性、可中断的获取锁以及超时获取锁等多种 synchronized 不具备的同步特性。

Lock接口提供的 synchronized 接口不具备的特性:

  • 尝试非阻塞的获取锁:当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁
  • 能被中断的获取锁:与 synchronized 不同,获取到锁的线程能响应中断,当获取到锁的线程被中断,中断异常被抛出,同时释放锁
  • 超时获取锁:在指定的截止时间之前获取锁,如果时间到了仍然无法获取锁,返回

Lock的常用方法:

  • void lock() 获取锁,获取锁后返回
  • void lockInterruptibly throws InterruptedException可中断的获取锁,在锁的获取中可以中断当前线程
  • boolean tryLock()尝试非阻塞的获取锁,调用后立即返回,获取了返回true,否则返回false
  • boolean tryLock(long time,TimeUnit unit) throws InterruptedException超时获取锁。当前线程在时间内获得了锁,返回true;当前线程在时间内被中断,抛出异常;超出时间,返回false
  • void unlock()释放锁
  • Condition newCondition()获取等待通知组件,该组件与当前的锁绑定。当前线程获取了锁,才能调用该组件的wait()方法,调用后,当前线程将释放锁

队列同步器AQS

队列同步器(AbstractQueuedSynchronizer)是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列完成资源获取线程的排队工作。

AQS 接口

同步器的时机是基于模板方法模式,使用者需要继承同步器并重写特定方法。重写时,使用以下三个方法访问或修改同步状态:

  • getState()获取当前同步状态
  • setState(int newState)设置当前同步状态
  • compareAndSetState(int expect,int update)使用CAS设置当前状态,该方法能保证状态设置的原子性

除此之外,同步器提供了模板方法,分为三类:独占式获取与释放同步状态,共享式获取与释放同步状态,查询同步队列中的等待线程情况。

AQS实现分析

同步队列

AQS 内部依赖一个同步的 FIFO 双向队列来完成同步状态的管理。当前线程获取同步状态失败时,将当前线程以及等待状态等信息构造成一个节点并将其加入同步队列,同时阻塞该线程;当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中,一个几点代表一个线程,保存着线程的引用、等待状态、前驱和后继节点。

  • int waitStatus 等待状态
  • Node prev 前驱
  • Node next 后继
  • Node nextWaiter 等待队列中的后继节点
  • Thread thread 获取同步状态的线程

等待状态包含如下:

  • CANCELLED: 在同步队列中等待的线程超时或被中断,需要从同步队列中取消等待,节点进入该状态将不会变化
  • SIGNAL: 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者取消,将会通知后继节点,使得后继节点的线程得以运行
  • CONDITION:节点在等待队列中,节点线程等待在 Condition 上,当其他线程对 Condition 调用了 signal() 方法后,该节点会从等待队列转移到同步队列中
  • PROPAGATE:表示下一次共享式同步状态获取将无条件传播下去
  • INITIAL:初始状态

入队

同步器提供了一个基于CAS的设置尾节点的方法compareAndSetTail(Node expect,Node update),传递的两个参数是当前线程认为的尾节点和当前的节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

出队

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,会唤醒后继节点,后继节点将会在获取同步状态成功时将自己设置为首节点。

独占式同步状态获取与释放

独占式也就是同一时刻仅有一个线程持有同步状态。

同步状态获取

独占式同步状态获取采用acquire(int arg)方法。该方法对中断不敏感,由于线程获取同步状态失败加入到同步队列中,后序对线程进行中断操作时,线程不会从队列中移除。

public final void acquire(int arg) {

        if (!tryAcquire(arg) &&//获取同步状态
        //首先生成节点加入队列
        //然后等待前驱节点成为头节点并获取同步状态
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  • tryAcquire:尝试获取锁,获取成功则设置锁状态并返回true,否则返回false
  • addWaiter:如果获取锁失败,调用该方法将当前线程加入到同步队列尾部
  • acquireQueued:当前线程根据公平性原则进行阻塞等待(自旋,也就是死循环),直到获取锁为止。返回当前线程在等待过程中有没有中断过。
  • selfInterrupt:产生一个中断
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter(Node mode) 用来构造节点以及加入同步队列。通过使用compareAndSetTail(Node expect,Node update)来确保节点能被线程安全添加。在enq(final Node node)方法中,使用死循环保证节点的正确添加。在死循环中,只有通过CAS将节点设置为尾节点之后,当前线程才能从该方法返回。

此时,节点进入同步队列之后,进入了一个自选的过程。当条件满足,获得了锁,退出队列。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //当前驱是头结点,尝试获取同步状态
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在该方法中,只有前驱是头节点才能尝试获取同步状态。原因有两个:1. 头节点是成功获取到同步状态的节点,头节点线程释放锁,唤醒其后继节点;2. 维护同步队列的FIFO原则。

整体流程如下图所示:

同步状态释放

当前线程获取同步状态并执行了相应逻辑后,需要释放同步状态,使得后续节点能继续获取同步状态。调用release(int arg)方法释放同步状态,在释放同步状态之后,会唤醒其后继节点。

public final boolean release(int arg) {
        if (tryRelease(arg)) {//释放锁
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//唤醒头节点的后继节点
            return true;
        }
        return false;
    }

总结:获取同步状态时,维护一个同步队列,获取状态失败的线程加入队列并进行自旋;移出队列的条件是前驱为头节点且成功获取同步状态。释放时,先释放同步状态,然后唤醒头节点的后继节点。

共享式同步状态获取与释放

共享式与独占式的最主要区别在于同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻可以有多个线程获取同步状态。例如读操作可以有多个线程同时进行,而写操作同一时刻只能有一个线程进行写操作,其他操作都会被阻塞。

同步状态获取

AQS提供了acquireShared(int arg)共享式获取同步状态

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //前驱是头结点,获取同步状态
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireShared(int arg)方法中,首先调用tryAcquireShared(int arg)尝试获取同步状态,返回一个int,当返回值大于等于0,表示能获取到同步状态。否则,获取失败调用doAcquireShared(int arg)自旋获取同步状态。

同步状态释放

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

该方法释放同步状态后,将会唤醒后续处于等待状态的节点。因为可能会存在多个线程同时进行释放同步状态资源,所以需要确保同步状态安全地成功释放,一般都是通过CAS和循环来完成的。

独占式超时获取同步状态

AQS提供了tryAcquireNanos(int arg,long nanos)方法,是acquireInterruptibly(int arg)的增强。除了响应中断之外,还有超时控制。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

其中,doAcquireNanos(arg, nanosTimeout)用来超时获取同步状态。

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                //获取同步状态成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                //获取失败
                //重新计算需要的休眠时间
                nanosTimeout = deadline - System.nanoTime();
                //超时返回
                if (nanosTimeout <= 0L)
                    return false;
                //未超时,等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //中断处理
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

与独占式获取同步状态的区别在于未获取到同步状态时的处理逻辑。独占式在未获取到同步状态时,会使当前线程一致处于等待状态,而超时获取会使当前线程等待nanosTimeout纳秒,如果没有获取到,将返回。

可重入锁ReentrantLock

可重入锁也就是支持重新进入的锁,它表示该锁能支持一个线程对资源的重复加锁。它可以等同于 synchronized 的使用(synchronized 隐式支持重入),但是提供了比 synchronized 更强大更灵活的锁机制,可以减少死锁发生的概率。

ReentrantLock 还提供了公平锁和非公平锁的选择,构造方法中接受一个可选的公平参数,默认是非公平的。公平锁也就是等待时间最长的线程最优先获取锁。但是公平锁的效率往往没有非公平的概率高。

ReentrantLock 有一个内部类 Sync,Sync 继承AQS,有两个子类:公平锁 FairSync 和非公平锁 NonfairSync。

获取锁

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //获取同步状态
            int c = getState();
            //锁处于空闲状态
            if (c == 0) {
            //获取锁成功,设置为当前线程所有
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //判断锁持有的线程是否是当前线程
            //如果是持有锁的线程再次请求,将同步状态值进行增加并返回true
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

首先判断同步状态,如果为0说明还没有被线程持有,通过CAS获取同步状态,如果成功返回true。否则,判断当前线程是否为获取锁的线程,如果是则获取锁,成功返回true。成功过去锁的线程再次获取锁,并将同步状态值增加。

释放锁

protected final boolean tryRelease(int releases) {
        //减掉releases
        int c = getState() - releases;
        //如果释放的不是持有锁的线程,抛出异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //state == 0 表示已经释放完全了,其他线程可以获取同步状态了
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。

公平锁

如果一个锁是公平的,那么按照请求的绝对时间顺序也就是FIFO进行锁的获取。公平锁的获取方法如下:

 protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

与非公平锁获取锁的方法不同,两者区别在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()。该方法用来判断当前线程是否位于CLH同步队列中的第一个,是则返回true,否则返回false。

公平锁保证了锁的获取按照FIFO原则,代价是进行大量的线程切换。非公平锁虽然可能造成线程“饥饿”,但是极少的线程切换,保证了其更大的吞吐量。

与 synchronized 比较

  1. synchronized 是JVM实现的, ReentrantLock 是JDK实现的。
  2. 当持有锁的线程长期不释放锁,正在等待的线程可以选择放弃等待,ReentrantLock 可以中断,synchronized 不可中断。
  3. synchronized 是非公平锁,ReentrantLock 默认是非公平,但是可以设置为公平锁。
  4. ReentrantLock还提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活,在多个条件变量和高度竞争锁的地方,ReentrantLock更加适合。
  5. 除非要使用 ReentrantLock 的高级功能,否则优先使用 synchronized。这是因为 synchronized 是 JVM 实现的一种锁机制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。并且使用 synchronized 不用担心没有释放锁而导致死锁问题,因为 JVM 会确保锁的释放。

读写锁

ReentrantLock 是排他锁,同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,写线程访问时,其他的线程均被阻塞。通过读锁和写锁分离,使得并发性相比一般的排他锁有了很大提升。

读写锁的主要特性:

  1. 公平性:支持公平性锁和非公平锁
  2. 重进入:支持重入。读线程获取读锁后可以再次获取读锁。写线程获取写锁后能再次获取写锁,也能获取读锁。
  3. 锁降级:遵循获取写锁,获取读锁再释放写锁的次序,写锁能降级成为读锁

接口示例

ReentrantReadWriteLock 实现了接口 ReadWriteLock,维护了一对相关的锁。

public interface ReadWriteLock {
    Lock readLock();//返回读锁
    Lock writeLock();//返回写锁
}

ReentrantReadWriteLock 使用了三个内部类

 /** 内部类  读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 内部类  写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;
    
/** 同步器 */
final Sync sync;

构造方法如下:

/** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

读写锁同样依赖AQS实现同步功能,读写状态就是其同步器的同步状态。因此其同步器需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态。

在 ReentrantLock 中使用一个 int 类型的 state 表示同步状态,表示锁被一个线程重复获取的次数。读写锁需要用一个变量维护多种状态,所以采用了“按位切割使用”的方式维护这个变量。读写锁将变量切分为两个部分,高16位表示读,低16位表示写。分割之后,读写锁通过位运算确定读和写各自的状态。假设当前状态为S,写状态等于 S&0X0000FFFF (抹去高16位),读状态等于 S>>>16 (无符号补0右移16位)。写状态增加1,S = S+1,读状态加1,S = S + ( 1 << 16 )。

写锁

写锁是一个支持重进入的排它锁。

获取

 protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        //当前同步状态
        int c = getState();
        //写锁状态
        int w = exclusiveCount(c);
        if (c != 0) {
            //c != 0 && w == 0 表示存在读锁
            //当前线程不是已经获取写锁的线程
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            //超出最大范围
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires);
            return true;
        }
        //是否需要阻塞
        if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
            return false;
        //设置获取锁的线程为当前线程
        setExclusiveOwnerThread(current);
        return true;
    }

首先获取同步状态和写锁的同步状态。如果存在读锁,或者当前线程不是持有写锁的线程,不能获得写锁。如果能获取写锁,且未超出最大范围,则更新同步状态并返回true。

释放

写锁的释放与 ReentrantLock 释放类似,每次释放减少写状态,当写状态为0表示写锁已经被释放。

// WriteLock类提供了unlock()释放写锁
public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {//调用AQS方法释放锁
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//这是定义在Sync类中的方法
protected final boolean tryRelease(int releases) {
    //释放的线程不为锁的持有者
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    //若写锁的新线程数为0,则将锁的持有者设置为null
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

读锁

读锁是一个支持重进入的共享锁,能被多个线程同时获取。

在写状态为0时,读锁总会被成功获取,然后增加读状态。如果当前线程已经获取了读锁,则增加读状态;如果获取读锁时,写锁已经被其他线程获取,则进入等待状态。读状态时所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能保存在 ThreadLocal 中。

读锁的每次释放均减少读状态。

锁降级

锁降级指的是写锁降级为读锁。锁降级是指把持有的写锁把持住,再获取到读锁,然后释放拥有的写锁的过程。也就是需要遵循先获取写锁、获取读锁再释放写锁的次序才可以称为锁降级。

锁降级中读锁的获取是必要的,主要是为了保证数据的可见性。如果当前线程A不获取读锁而是直接释放写锁,此刻另一线程B获取了写锁并修改了数据,那么当前线程A无法感知线程B的数据更新。如果当前线程A遵循锁降级的规则,则线程B会被阻塞,直到当前线程A使用数据并释放读锁之后,线程B才能获取写锁进行数据更新。

Condition 接口

在 synchronized 控制同步时,配合 Object 的 wait(), notify(), notifyAll() 等方法实现等待/通知模式。Lock 提供了条件 Condition 接口,两者配合实现了等待/通知模式。

Condition使用

Condition 定义了等待/通知两种类型的方法,线程调用这些方法时,需要提前获取 Condition 关联的锁。Condition 对象是由 Lock 对象创建出来的。

public class ConditionCase {
    Lock lock=new ReentrantLock();
    Condition condition=lock.newCondition();

    public void conditionWait() throws InterruptedException {
        lock.lock();
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
    }

    public void conditionSignal(){
        lock.lock();
        try{
            condition.signal();
        }finally {
            lock.unlock();
        }
    }
}

将 Condition 作为成员变量,调用 await() 方法造成当前线程在接到信号或被中断之前一直处于等待状态。调用signal()或者signalAll()方法会唤醒一个或所有的等待线程,能够从等待方法返回的线程必须获取与 Condition 相关的锁。

Condition的实现

通过 Lock 的newCondition()方法获取 Condition。Condition 是一个接口,其为一个的实现类是 ConditionObject,且是同步器AQS的内部类。

等待队列

每个 ConditionObject 包含一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在 Condition 对象上等待的线程。当一个线程调用 await()方法,那么该线程将释放锁,构造成节点加入等待队列并进入等待状态。此处的节点依然是AQS的内部类 Node。

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;

    //头节点
    private transient Node firstWaiter;
    //尾节点
    private transient Node lastWaiter;

    public ConditionObject() {
    }

    /** 省略方法 **/
}

Condition 拥有头节点和尾节点的引用。当一个线程调用await()方法,将该线程构造成一个几点,加入队列尾部。

在 Object 的监视器模型上,一个对象拥有一个同步队列和一个等待队列;并罚保中的同步器拥有一个同步队列和多个等待队列,每个 Condition 对应一个等待队列。

等待

调用 Condition 的await()系列方法将使当前线程释放锁并进入等待状态。当从该方法返回时,当前线程一定获取了 Condition 相关的锁。从队列的角度看,当调用该方法时,相当于同步队列的首节点(也就是获取了锁的节点)移动到 Condition 的等待队列中。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //将当前线程加入等待队列
            Node node = addConditionWaiter();
            //释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //检查该节点的线程是否在同步队列上,如果不在,还不具备竞争锁的资格,继续等待
            while (!isOnSyncQueue(node)) {
                //挂起线程
                LockSupport.park(this);
                //线程已经中断则退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //竞争同步状态
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

调用该方法的线程是同步队列中的首节点(获取锁的线程)。该方法将当前线程构造成节点加入等待队列,释放同步状态,唤醒后继节点,然后当前线程进入等待状态。然后不断监测该节点代表的线程是否出现在同步队列中(也就是收到了signal信号),如果不是,则挂起;否则开始竞争同步状态。

通知

调用 Condition 的 signal()方法,将会唤醒在等待队列的首节点。在唤醒节点之前,将节点移入同步队列中。

  public final void signal() {
        //检测当前线程是否为拥有锁的独
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //头节点,唤醒条件队列中的第一个节点
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);    //唤醒
    }

该方法首先会判断当前线程是否已经获取了锁,然后唤醒等待队列中的头节点,具体来说,先将条件队列中的头节点移出,然后调用AQS的enq(Node node)方法将其安全地移到CLH同步队列中。当节点移动到同步队列中,当前线程再唤醒该节点的线程。

总结

一个线程获取锁后,通过调用Condition的await()方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过isOnSyncQueue(Node node)方法不断自检看节点是否已经在CLH同步队列了,如果是则尝试获取锁,否则一直挂起。当线程调用signal()方法后,程序首先检查当前线程是否获取了锁,然后通过doSignal(Node first)方法唤醒CLH同步队列的首节点。被唤醒的线程,将从await()方法中的while循环中退出来,然后调用acquireQueued()方法竞争同步状态。

示例

//生产者-消费者问题
public class ConditionCase {
    private LinkedList<String> buffer;
    private int maxSize;
    private Lock lock;
    private Condition fullCondition;
    private Condition notFullCondition;

    public ConditionCase(int maxSize){
        this.maxSize=maxSize;
        buffer=new LinkedList<>();
        lock=new ReentrantLock();
        fullCondition=lock.newCondition();
        notFullCondition=lock.newCondition();
    }

    public void set(String string) throws InterruptedException {
        lock.lock();
        try {
            while(maxSize==buffer.size()){
                notFullCondition.await();
            }
            buffer.add(string);
            fullCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    public String set() throws InterruptedException {
        String string;
        try {
            lock.lock();
            while(buffer.size()==0){
                fullCondition.await();
            }
            string=buffer.poll();
            notFullCondition.signal();
        } finally {
            lock.unlock();
        }
        return string;
    }
}


参考资料