阅读 1856

java并发编程系列:牛逼的AQS(下)

标签: 「我们都是小青蛙」公众号文章

Condition

ReentrantLock的内部实现

看完了AQS中的底层同步机制,我们来简单分析一下之前介绍过的ReentrantLock的实现原理。先回顾一下这个显式锁的典型使用方式:

Lock lock = new ReentrantLock();
lock.lock();
try {
    加锁后的代码
} finally {
    lock.unlock();     
}
复制代码

ReentrantLock首先是一个显式锁,它实现了Lock接口。可能你已经忘记了Lock接口长啥样了,我们再回顾一遍:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}
复制代码

其实ReentrantLock内部定义了一个AQS的子类来辅助它实现锁的功能,由于ReentrantLock是工作在独占模式下的,所以它的lock方法其实是调用AQS对象的aquire方法去获取同步状态,unlock方法其实是调用AQS对象的release方法去释放同步状态,这些大家已经很熟了,就不再赘述了,我们大致看一下ReentrantLock的代码:

public class ReentrantLock implements Lock {

    private final Sync sync;    //AQS子类对象
    
    abstract static class Sync extends AbstractQueuedSynchronizer { 
        // ... 为节省篇幅,省略其他内容
    }
    
    // ... 为节省篇幅,省略其他内容
}
复制代码

所以如果我们简简单单写下下边这行代码:

Lock lock = new ReentrantLock();
复制代码

就意味着在内存里创建了一个ReentrantLock对象,一个AQS对象,在AQS对象里维护着同步队列head节点和tail节点,不过初始状态下由于没有线程去竞争锁,所以同步队列是空的,画成图就是这样:

image_1c3hf30h3bmodidrvogfe11oh2q.png-16.3kB

Condition的提出

我们前边唠叨线程间通信的时候提到过内置锁的wait/notify机制,等待线程的典型的代码如下:

synchronized (对象) {
    处理逻辑(可选)
    while(条件不满足) {
        对象.wait();
    }
    处理逻辑(可选)
}
复制代码

通知线程的典型的代码如下:

synchronized (对象) {
    完成条件
    对象.notifyAll();、
}
复制代码

也就是当一个线程因为某个条件不能满足时就可以在持有锁的情况下调用该锁对象的wait方法,之后该线程会释放锁并进入到与该锁对象关联的等待队列中等待;如果某个线程完成了该等待条件,那么在持有相同锁的情况下调用该锁的notify或者notifyAll方法唤醒在与该锁对象关联的等待队列中等待的线程。

显式锁的本质其实是通过AQS对象获取和释放同步状态,而内置锁的实现是被封装在java虚拟机里的,我们并没有讲过,这两者的实现是不一样的。而wait/notify机制只适用于内置锁,在显式锁里需要另外定义一套类似的机制,在我们定义这个机制的时候需要整清楚:在获取锁的线程因为某个条件不满足时,应该进入哪个等待队列,在什么时候释放锁,如果某个线程完成了该等待条件,那么在持有相同锁的情况下怎么从相应的等待队列中将等待的线程从队列中移出

为了定义这个等待队列,设计java的大叔们在AQS中添加了一个名叫ConditionObject的成员内部类:

public abstract class AbstractQueuedSynchronizer {
    
    public class ConditionObject implements Condition, java.io.Serializable {
        private transient Node firstWaiter;
        private transient Node lastWaiter;

        // ... 为省略篇幅,省略其他方法
    }
}
复制代码

很显然,这个ConditionObject维护了一个队列,firstWaiter是队列的头节点引用,lastWaiter是队列的尾节点引用。但是节点类是Node?对,你没看错,就是我们前边分析的同步队列里用到的AQS的静态内部类Node,怕你忘了,再把这个Node节点类的主要内容写一遍:

static final class Node {
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
}
复制代码

也就是说:AQS中的同步队列和自定义的等待队列使用的节点类是同一个

又由于在等待队列中的线程被唤醒的时候需要重新获取锁,也就是重新获取同步状态,所以该等待队列必须知道线程是在持有哪个锁的时候开始等待的。设计java的大叔们在Lock接口中提供了这么一个通过锁来获取等待队列的方法:

Condition newCondition();
复制代码

我们上边介绍的ConditionObject就实现了Condition接口,看一下ReentrantLock锁是怎么获取与它相关的等待队列的:

public class ReentrantLock implements Lock {

    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
        // ... 为节省篇幅,省略其他方法
    }
    
    public Condition newCondition() {
        return sync.newCondition();
    }
    
    // ... 为节省篇幅,省略其他方法
}
复制代码

可以看到,其实就是简单创建了一个ConditionObject对象而已~ 由于 ConditionObject 是AQS 的成员内部类,所以在创建的 ConditionObject 对象中持有 AQS 对象的引用,所以通过 ConditionObject 对象访问到 同步队列,也就是可以重新获取同步状态,也就是重新获取锁 。用文字描述还是有些绕,我们先通过锁来创建一个Condition对象:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
复制代码

由于在初始状态下,没有线程去竞争锁,所以同步队列是空的,也没有线程因某个条件不成立而进入等待队列,所以等待队列也是空的,ReentrantLock对象、AQS对象以及等待队列在内存中的表示就如图:

image_1c3hji5a4uvn1sdj1ro33hm87m61.png-26.7kB

当然,这个newCondition方法可以反复调用,从而可以通过一个锁来生成多个等待队列

Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
复制代码

那接下来需要考虑怎么把线程包装成Node节点放到等待队列的以及怎么从等待队列中移出了。ConditionObject成员内部类实现了一个Condition的接口,这个接口提供了下边这些方法:

public interface Condition {
    void await() throws InterruptedException;
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void awaitUninterruptibly();
    void signal();
    void signalAll();
}
复制代码

来看一下这些方法的具体意思:

方法名 描述
void await() 当前线程进入等待状态,直到被通知(调用signal或者signalAll方法)或中断
boolean await(long time, TimeUnit unit) 当前线程在指定时间内进入等待状态,如果超出指定时间或者在等待状态中被通知或中断则返回
long awaitNanos(long nanosTimeout) 与上个方法相同,只不过默认使用的时间单位为纳秒
boolean awaitUntil(Date deadline) 当前线程进入等待状态,如果到达最后期限或者在等待状态中被通知或中断则返回
void awaitUninterruptibly() 当前线程进入等待状态,直到在等待状态中被通知,需要注意的时,本方法并不相应中断
void signal() 唤醒一个等待线程。
void signalAll() 唤醒所有等待线程。

可以看到,Condition中的await方法和内置锁对象的wait方法的作用是一样的,都会使当前线程进入等待状态,signal方法和内置锁对象的notify方法的作用是一样的,都会唤醒在等待队列中的线程。

像调用内置锁的wait/notify方法时,线程需要首先获取该锁一样,调用Condition对象的await/siganl方法的线程需要首先获得产生该Condition对象的显式锁。它的基本使用方式就是:通过显式锁的 newCondition 方法产生Condition对象,线程在持有该显式锁的情况下可以调用生成的Condition对象的 await/signal 方法,一般用法如下:

Lock lock = new ReentrantLock();

Condition condition = lock.newCondition();

//等待线程的典型模式
public void conditionAWait() throws InterruptedException {
    lock.lock();    //获取锁
    try {
        while (条件不满足) {
            condition.await();  //使线程处于等待状态
        }
        条件满足后执行的代码;
    } finally {
        lock.unlock();    //释放锁
    }
}

//通知线程的典型模式
public void conditionSignal() throws InterruptedException {
    lock.lock();    //获取锁
    try {
        完成条件;
        condition.signalAll();  //唤醒处于等待状态的线程
    } finally {
        lock.unlock();    //释放锁
    }
}
复制代码

假设现在有一个锁和两个等待队列:

Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
复制代码

画图表示出来就是:

image_1c3hjlt3n14rl1i9g1epg3371ce16e.png-39.7kB

有3个线程maint1t2同时调用ReentrantLock对象的lock方法去竞争锁的话,只有线程main获取到了锁,所以会把线程t1t2包装成Node节点插入同步队列,所以ReentrantLock对象、AQS对象和同步队列的示意图就是这样的:

image_1c3hjmnj819nl57f11l11p7f9196r.png-94.5kB

因为此时main线程是获取到锁处于运行中状态,但是因为某个条件不满足,所以它选择执行下边的代码来进入condition1等待队列:

lock.lock();
try {
    contition1.await();
} finally {
    lock.unlock();
}
复制代码

具体的await代码我们就不分析了,太长了,我怕你看的发困,这里只看这个await方法做了什么事情:

  1. condition1等待队列中创建一个Node节点,这个节点的thread值就是main线程,而且waitStatus-2,也就是静态变量Node.CONDITION,表示表示节点在等待队列中,由于这个节点是代表线程main的,所以就把它叫做main节点把,新创建的节点长这样:

    image_1c3hs5hvfu3g186m1g8m9tjdg9cg.png-14.1kB

  2. 将该节点插入condition1等待队列中:

    image_1c3hs74l64m11n6eedc357acvct.png-118.9kB

  3. 因为main线程还持有者锁,所以需要释放锁之后通知后边等待获取锁的线程t,所以同步队列里的0号节点被删除,线程t获取锁,节点1称为head节点,并且把thread字段设置为null:

    image_1c3hs8phe1r1smrl12q41231hfuda.png-103.4kB

至此,main线程的等待操作就做完了,假如现在获得锁的t1线程也执行下边的代码:

lock.lock();
try {
    contition1.await();
} finally {
    lock.unlock();
}
复制代码

还是会执行上边的过程,把t1线程包装成Node节点插入到condition1等待队列中去,由于原来在等待队列中的节点1会被删除,我们把这个新插入等待队列代表线程t1的节点称为新节点1吧:

image_1c3hshhsik77531ribb6kv57e4.png-112.2kB

这里需要特别注意的是:同步队列是一个双向链表,prev表示前一个节点,next表示后一个节点,而等待队列是一个单向链表,使用nextWaiter表示下一个节点,这是它们不同的地方

现在获取到锁的线程是t2,大家一起出来混的,前两个都进去,只剩下t2多不好呀,不过这次不放在condition1队列后头了,换成condition2队列吧:

lock.lock();
try {
    contition2.await();
} finally {
    lock.unlock();
}
复制代码

效果就是:

image_1c3hsjumr5jb3c5cqhdk57tieh.png-127.6kB

大家发现,虽然现在没有线程获取锁,也没有线程在锁上等待,但是同步队列里仍旧有一个节点,是的,同步队列只有初始时无任何线程因为锁而阻塞的时候才为空,只要曾经有线程因为获取不到锁而阻塞,这个队列就不为空了

至此,maint1t2这三个线程都进入到等待状态了,都进去了谁把它们弄出来呢???额~ 好吧,再弄一个别的线程去获取同一个锁,比方说线程t3去把condition2条件队列的线程去唤醒,可以调用这个signal方法:

lock.lock();
try {
    contition2.signal();
} finally {
    lock.unlock();
}
复制代码

因为在condition2等待队列的线程只有t2,所以t2会被唤醒,这个过程分两步进行:

  1. 将在condition2等待队列的代表线程t2新节点2,从等待队列中移出。

  2. 将移出的节点2放在同步队列中等待获取锁,同时更改该节点的waitStauts0

这个过程的图示如下:

image_1c3hsv52u8i64rgsdo2t1lngeu.png-119.3kB

如果线程t3继续调用signalAllcondition1等待队列中的线程给唤醒也是差不多的意思,只不过会把condition1上的两个节点同时都移动到同步队列里:

lock.lock();
try {
    contition1.signalAll();
} finally {
    lock.unlock();
}
复制代码

效果如图:

image_1c3hthb14i21a58i5168p1a1bfb.png-98.9kB

这样全部线程都从等待状态中恢复了过来,可以重新竞争锁进行下一步操作了。

以上就是Condition机制的原理和用法,它其实是内置锁的wait/notify机制在显式锁中的另一种实现,不过原来的一个内置锁对象只能对应一个等待队列,现在一个显式锁可以产生若干个等待队列,我们可以根据线程的不同等待条件来把线程放到不同的等待队列上去Condition机制的用途可以参考wait/notify机制,我们接下来把之前用内置锁和wait/notify机制编写的同步队列BlockedQueue显式锁 + Condition的方式来该写一下:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionBlockedQueue<E> {

    private Lock lock = new ReentrantLock();

    private Condition notEmptyCondition = lock.newCondition();

    private Condition notFullCondition = lock.newCondition();

    private Queue<E> queue = new LinkedList<>();

    private int limit;

    public ConditionBlockedQueue(int limit) {
        this.limit = limit;
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public boolean add(E e) throws InterruptedException {
        lock.lock();
        try {
            while (size() >= limit) {
                notFullCondition.await();
            }

            boolean result = queue.add(e);
            notEmptyCondition.signal();
            return result;
        } finally {
            lock.unlock();
        }
    }

    public E remove() throws InterruptedException{
        lock.lock();
        try {
            while (size() == 0) {
                notEmptyCondition.await();
            }
            E e = queue.remove();
            notFullCondition.signalAll();
            return e;
        } finally {
            lock.unlock();
        }
    }
}
复制代码

在这个队列里边我们用了一个ReentrantLock锁,通过这个锁生成了两个Condition对象,notFullCondition表示队列未满的条件,notEmptyCondition表示队列未空的条件。当队列已满的时候,线程会在notFullCondition上等待,每插入一个元素,会通知在notEmptyCondition条件上等待的线程;当队列已空的时候,线程会在notEmptyCondition上等待,每移除一个元素,会通知在notFullCondition条件上等待的线程。这样语义就变得很明显了。如果你有更多的等待条件,你可以通过显式锁生成更多的Condition对象。而每个内置锁对象都只能有一个相关联的等待队列,这也是显式锁对内置锁的优势之一

我们总结一下上边的用法:每个显式锁对象又可以产生若干个Condition对象,每个Condition对象都会对应一个等待队列,所以就起到了一个显式锁对应多个等待队列的效果

AQS中其他针对等待队列的重要方法

除了Condition对象的awaitsignal方法,AQS还提供了许多直接访问这个队列的方法,它们由都是public final修饰的:

public abstract class AbstractQueuedSynchronizer {
    public final boolean owns(ConditionObject condition)
     public final boolean hasWaiters(ConditionObject condition) {}
     public final int getWaitQueueLength(ConditionObject condition) {}
     public final Collection<Thread> getWaitingThreads(ConditionObject condition) {}
}
复制代码
方法名 描述
owns 查询是否通过本AQS对象生成的指定的 ConditionObject对象
hasWaiters 指定的等待队列里是否有等待线程
getWaitQueueLength 返回正在等待此条件的线程数估计值。因为在构造该结果时,多线程环境下实际线程集合可能发生大的变化
getWaitingThreads 返回正在等待此条件的线程集合的估计值。因为在构造该结果时,多线程环境下实际线程集合可能发生大的变化

如果有需要的话,可以在我们自定义的同步工具中使用它们。

题外话

写文章挺累的,有时候你觉得阅读挺流畅的,那其实是背后无数次修改的结果。如果你觉得不错请帮忙转发一下,万分感谢~ 这里是我的公众号「我们都是小青蛙」,里边有更多技术干货,时不时扯一下犊子,欢迎关注:

小册

另外,作者还写了一本MySQL小册:《MySQL是怎样运行的:从根儿上理解MySQL》的链接 。小册的内容主要是从小白的角度出发,用比较通俗的语言讲解关于MySQL进阶的一些核心概念,比如记录、索引、页面、表空间、查询优化、事务和锁等,总共的字数大约是三四十万字,配有上百幅原创插图。主要是想降低普通程序员学习MySQL进阶的难度,让学习曲线更平滑一点~ 有在MySQL进阶方面有疑惑的同学可以看一下:

关注下面的标签,发现更多相似文章
评论