并发编程之显式条件

143 阅读7分钟

我们之前介绍 synchronized 关键字语义的时候说过,synchronized 虽然不需要我们手动的加锁和释放锁了,但不代表他没有用到锁。同时,我们说每个对象本身结构中也内置了阻塞队列,线程持有器,锁重入计数器等字段。

所以,与其说是 synchronized 实现了「自动化的锁机制」,不如说是 synchronized 借助了我们 Java 中的对象结构实现了了「自动化的锁机制」。

虽然,我们通过 synchronized 对线程实现了自动化的阻塞与唤醒,但是对于已经获得锁的线程来说,如果在他们的执行期间缺少了某些条件以继续执行,比如调用了数据库服务等待数据回显,那么我们从 CPU 的使用效率来看是不应该让当前线程继续持有 CPU 空等待的。

wait 方法使用在 synchronized 内部,专门用于将那些已经获得锁但由于缺乏某些条件不能继续执行的线程阻塞到另一个队列上,并释放锁及 CPU。同理,notify 方法就是从等待的队列上释放一个线程以标识它的条件可能满足了,让它尝试重新竞争锁。

而在我们的显式锁中,对应 wait/notify 语义的就是我们本篇要讨论的『显式条件』,我们一起来看看。

实现原理

在探究『显式条件』的实现原理之前,我们先通过一个小的代码 demo,看看显式条件是如何使用的。

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

Thread thread1 = new Thread(){
    @Override
    public void run(){
        lock.lock();
        System.out.println("preparing waiting.....");
        try {
            condition.await();
        } catch (InterruptedException e) {}
        System.out.println("waiting end.......");
                
        lock.unlock();
    }
};
thread1.start();

为了缩短篇幅,没给全方法体,只抽出来核心的部分代码,具体完整的代码,大家可以去 github 自行查看下载。

得到一个显式条件,还是很简单的,我们只要通过 ReentrantLock 的 newCondition 方法即可获得一个条件对象。接着,在获取到锁之后如果遇到某些条件不满足,不能继续执行了,直接调用 Condition 实例的 await 方法即可,释放一个条件队列上的线程调用 signal 即可,不再赘述。

这里需要注意一点的是,正如同 synchronized 中使用 wait/notify 方法一样,condition 的两个方法 await 和 signal 也一样是需要在 lock 方法之后调用的,也即是必须先获得锁才有机会被条件等待

下面我们看实现原理,先从 newCondition 方法开始:

public Condition newCondition() {
    return sync.newCondition();
}

直接透传调用的 AQS 中的 newCondition 方法:

final ConditionObject newCondition() {
    return new ConditionObject();
}

ConditionObject 是 AQS 中定义的一个内部类,并实现了 Condition 接口,是一个真正的显式条件实现者。我们直接看看它的 await 方法:

await方法实现

我用不同颜色的实线将 await 方法切分成了四个模块,每一个模块我们进行一个整体上的概括,具体的代码实现大家自行研究,也欢迎加我微信讨论。

分析之前首先需要明确等待队列和阻塞队列虽然共用的同一个节点类 Node,但是确实两个完全不同的队列。阻塞队列使用 next 指针链接下一个节点,等待队列使用的是 nextWaiter 指针链接下一个节点。

这一点需要有一个前提认识,不然等你分析具体代码实现的时候,你会不知所措的。 下面我们总结这个四个部分的大致逻辑:

  1. 将当前线程包装成节点追加到等待队列的尾部,因为这个时候还没有释放锁,所以追加过程是无并发风险的,接着释放自己持有的显式锁。
  2. 如果条件满足了或是被其他线程移除出等待队列了,那么 isOnSyncQueue 就会返回 true,结束循环并尝试第三步,否则将会被阻塞当前线程并等待唤醒。
  3. 从等待队列中移除之后依然需要先尝试获取显式锁,接着才能返回到当初被阻塞的调用处。
  4. 处理中断,抛出异常或是设置中断标志位。

这就是 await 方法的实现逻辑,再简洁一点的概括就是:

释放持有的锁-->阻塞自己等待被唤醒-->被唤醒后先尝试获取锁-->处理下中断-->方法返回

总的来说,并不难理解,接着我们看 signal 方法的实现逻辑:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

如果自己并没有持有锁而试图去释放等待队列上的线程节点,直接抛出异常,拒绝访问。反之,拿到等待队列头节点,并调用 doSignal 释放一个阻塞的节点线程。

doSignal 方法的实现如下:

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
        (first = firstWaiter) != null);
}

方法逻辑主要分为两个部分,一个是循环体,一个是循环条件。循环体做的事情就是置换出来第等待队列上的第一个节点,让它与队列脱钩。循环条件里面的逻辑就是,尝试将刚才脱钩的节点转移到阻塞队列上。

如果转移失败了,只有一种可能,就是当前需要被转移的节点的等待状态不再是 CONDITION,也即是他的等待状态已经被取消,所以我们就不需要关心它了,顺着等待队列找到下一个有效的节点,尝试去转移并唤醒其对应的线程。

整个循环只有成功释放了一个节点线程,才能结束。

这样,我们对于 condition 中的两个核心的方法原理就分析完了,相信你一定有所了解了,至于其他的一些 signalAll,awaitNanos 以及 await 的几个重载方法来说,他们大多离不开以上分析的两个方法,这里就不再赘述了。

生产者消费者模型实现

下面我们应用一下上面介绍的『显式条件』,通过实现一个经典的并发模型场景,之前我们是通过 wait/notify 实现的,生产者和消费者公用了同一个条件等待队列,相对来说是不太合适的,效率是不如我们的显式条件的。

为什么这么说呢?

因为我们的显式条件依附于显式锁,是可以创建多个的,所以对于生产者与消费者来说,我们可以创建两个不同的条件等待队列分别来阻塞条件不满足的线程,唤醒的时候也可以「对症下药」,不需要同时唤醒所有的生产者与消费者。这一点,我希望你能有所体会。

Repertory仓库类

我们定义一个仓库类,提供添加和消费产品的能力,当然这两种方式是并发安全的。对于生产方法来说,当仓库满了则不能继续生产产品,而需要在等待队列上进行等待。

对于消费方法来说,当仓库为空是则不能继续消费产品,而需要在另一个等待队列上进行等待。

当然,如果成功生产了一个产品,将尝试唤醒所有消费者,告诉他们仓库有产品了,反之,如果成功消费了一个产品,将尝试唤醒所有的生产者,告诉他们仓库中有空余位置了,你们可以继续生产了。

这样,当我们同时创建大量的生产者与消费者,并让他们并发调用这两个方法,消费者与生产者的足迹会交替的出现在控制台上。这部分代码我已经写好了,并且测试过了,因为限于篇幅且不是核心代码,这里就不贴出来了,大家可以自行尝试编写或从我的 github 上下载我已经完成的实现,欢迎你给我提出建议!

关于锁的相关内容,我们大致介绍完了,下一篇将介绍 Java 中的异步任务与线程池的概念。

关注公众不迷路,一个爱分享的程序员。 公众号回复「1024」加作者微信一起探讨学习! 每篇文章用到的所有案例代码素材都会上传我个人 github github.com/SingleYam/o… 欢迎来踩!

YangAM 公众号