最好懂的AQS核心源码

398 阅读11分钟

最好懂的AQS核心源码

相必大家应该都在各个八股文里听过AQS吧。只要你简历里写上你会多线程,这个知识点几乎是必问的。

在阅读之前,读者希望你们对ReentrantLock和CountDownLounch()两个类有过使用经历。

如果中途有什么名词不能理解,可以去文章底部答疑区域寻找答案哟~

什么是AQS?

AQS的全称就是 AbstractQueuedSynchronizer,翻译过来就是抽象队列同步器,这是JUC包中的一个类。

这个类很关键,很多并发工具类都是基于这个类实现的,他也是JUC同步框架的基石。

实现类需要去继承该类,并重写指定方法就可以实现一套线程同步机制。

我们经常用到的ReentrantLock和CountDownLatch等也都是基于AQS实现的。

从生活角度理解AQS

AQS的设计就和取钱的场景很类似,1号,2号,3号选手都在银行取钱,柜台只有一个,因此同一时间只有一个人可以取钱。1号在取钱的时候,2号,3号都会在等待区等待。1号取完后,会通知2号,2号再去取。

AQS框架

AQS核心

img

AQS的核心只有两个东西:

  1. 一个用volatile修饰的state变量(最后有讲解)
  2. 一个CLH(三个人名缩写)的双向队列。

每一个线程获取锁的时候,就会试图去修改state变量,如果修改成功就能获取锁,如果失败,就自动加入双向队列的末尾,等待持有锁的线程对其进行释放。

这里就可以理解成银行取钱,只有一张票,获得票的人可以取钱,取完后,把这个票给另一个在等待的人,另一个人就能取钱了

其中,AQS对state的访问设置了三种方法:

  • getState()
  • setState()
  • compareAndSetState()

其中compareAndSetState 就是 CAS 法的思想体现。它结合了CAS自旋 volatile 变量(最后有讲解)。

两种资源共享方式

AQS定义了两种资源共享方式:Exclusive(独占的,仅一个线程可以执行,如ReentrantLock)和Share(共享的,如CountDownLatch)

不同自定义同步器争用共享资源的方式不同。在实现自定义同步器的时候,只需要实现state的释放和获取的方式即可。

像ReentrantLock,CountDownLatch都是jdk里实现了以下方法的自定义同步器

自定义同步器需要实现以下几种方法:

独占式(比如ReentrantLock)

  1. tryAcquire(int) : 独占的资源共享方式。尝试获取资源,成功返回true,失败返回false
  2. tryRelease(int) : 独占的资源共享方式。尝试释放资源,成功返回true,失败返回false

共享式(比如CountDownLatch)

  1. tryAcquireShared(int):共享的资源共享方式。尝试获取资源,负数表示失败,0表示成功,但没有剩余资源,正数表示成功,有剩余资源。
  2. tryReleaseShared(int):共享的资源共享方式。尝试释放资源。

ReentrantLock为例(独占)

以ReentrantLock为例,state初始化是0,表示未锁定状态。A线程调用ReentrantLock的lock()方法时,同时会调用tryAcquire()方法,抢占锁,并且将state+1。此后所有的线程去执行tryRelease方法都会失败,直到A线程调用ReentrantLock的unLock()方法,也就是调用ryRelease(int)方法,state会变成0,之后其他线程才能获得到锁。当然A线程在锁的时候,也是可以重复获取锁的,每重复获取一次,state+1,在释放的时候,获得多少次就要减多少次。

CountDownLatch为例(共享)

以CountDownLatch为例,当你在构造器里面传入一个参数n的时候,state也会被初始化成n。每个子线程countDown()一次,state就会通过CAS减1。当state=0的时候,就会把主调用线程unpark()掉,然后主调用线程也会从await()处返回,继续后续动作。

源码布局

下面我们要说的源码,以ReentrantLock为例,它的核心毫无疑问是lock和unLock方法,而这两个方法的底层则是AQS中的acquire和release方法。

本文只说明讲解独占式的核心源码

源码细究(acquire)

AQS最核心的地方就是 acquire(int) 方法了

public final void acquire(int){
    if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE,arg))){
        selfInterrupt();
    }
}

同时,它也是ReentrantLock 的lock()方法的底层

我们来对这个源码进行一个解读:

  1. tryAcquire()方法尝试直接获取资源,如果成功就返回。
  2. addWaiter()方法则是把该线程加入等待队列的队尾,并且标记为独占模式
  3. acquireQueued()让线程阻塞在等待队列里面获取资源,直到获取到资源才返回,如果在等待的过程被中断,就返回false,否则返回true。
  4. 如果获取资源失败了或者等待过程被中断过,则需要自我中断一下。

1.tryAcquire()

protected boolean tryAcquire(int arg) {
     throw new UnsupportedOperationException();
}

我们不难发现,这个tryAcquire它啥都没干!

我们上文提到过,AQS只是一个框架,需要实现它的类去实现对应的关键方法,如果没有实现就会抛出异常,这里只是定义了一个接口。

具体资源的获取逻辑要由自定义的同步器去实现。至于是否能重入,就要看自定义同步器的的设计了。

2.addWaiter(Node mode)

这一步上述提到,主要就是对当前的节点进行初始化,并加入队列中等待。运用到了CAS的思想。

private Node addWaiter(Node mode) {
    //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);

    //尝试快速方式直接放到队尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    //上一步失败则通过enq入队。
    enq(node);
    return node;
}
private Node enq(final Node node) {
    //CAS"自旋",直到成功加入队尾
    for (;;) {
        Node t = tail;
        if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {//正常流程,放入队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

不懂CAS自旋的读者,请看最下面的详解

3.acquireQueued(Node,int)

acquiredQueued中传入了刚刚被加入等待队列末尾的线程节点。线程进入这一步后,要做的事情也就是进入等待区休息,直到别的线程释放资源,将自己唤醒。

final boolean acquireQueued(final Node node,int arg){
    //标记是否成功拿到资源
    boolean failed = true;
    
    try{
        //标记是否在等待过程被中断过
        boolean interrupted = false;
        
        //自旋
        for(;;){
            //拿到当前节点的前驱
            final Node p = node.processor();
            //如果这个前驱是head节点,说明当前节点是第二位。说明head节点释放完资源唤醒当前节点
            if(p == head && tryAcquire(arg)){
                //如果尝试获取成功了,就将当前节点设置为头节点
                setHead(node);
                //将头节点的next指针设置为空,这样就能被垃圾回收了
                p.next = null;
                //成功获取到了资源
                failed = false;
                return interrupted;
            }
           //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
            if(shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt()){
                //等待过程被中断了,只要有一次,就标记为true
				interrupt = true;                
            }
        }
       
    }finally{
        //如果等待过程中timout,没有成功获取资源,就取消等待。
        if(falied){
            cancelAcquire(node);
        }
    }
}

接下来我们看一下这个过程中调用的两个函数shouldParkAfterFailedAcquire和parkAndCheckInterrupt吧

3.1.shouldParkAfterFailedAcquire(Node pred, Node node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//拿到前驱的状态
    if (ws == Node.SIGNAL)
        //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
        return true;
    if (ws > 0) {
        /*
         * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
         * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常

总的来说,它干的事情就是:如果前驱节点状态不是SIGNAL,就去等待队列前面慢慢找,直到找到一个节点的waitStatus > 0的地方安家

3.2.parkAndCheckInterrupt()

如果线程找到地方安家了,就安心去休息了。

 private final boolean parkAndCheckInterrupt() {
     LockSupport.park(this);//调用park()使线程进入waiting状态
     return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
 }

4.小结

看完这些后,我们再重新看这个核心源码

public final void acquire(int){
    if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE,arg))){
        selfInterrupt();
    }
}

我们总结下流程:

  1. 自定义同步器自定义的tryAcquire()方法尝试获取资源,如果成功直接返回
  2. 如果没有成功就将线程加入等待队列尾部,标记为独占模式(addWaiter(Node.EXCLUSIVE,arg))
  3. acquireQueued()方法会让线程在等待队列里休息,有机会被unpark()唤醒尝试获取资源,如果等待过程被中断,返回true,否则返回false
  4. 如果线程被中断,就会获取资源后自我中断

这也是ReentrantLock.lock()的流程,其它的lock()源码也和这个大同小异。

源码细究(Release)

我们刚刚说了acquire操作,下面我们来讲讲它的反向操作release操作吧

这也是独占模式下,释放共享资源(state)的顶层入口。他会唤醒其它线程去获取资源,这也正是unlock()的语义

1.tryRelease(int)

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;//找到头结点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒等待队列里的下一个线程
        return true;
    }
    return false;
}

当然,这个tryRelease()方法也是要实现这个框架的类去自己实现的。

 protected boolean tryRelease(int arg) {
     throw new UnsupportedOperationException();
 }

2.unparkSuccessor(Node)

private void unparkSuccessor(Node node) {
    //这里,node一般为当前线程所在的结点。
    int ws = node.waitStatus;
    if (ws < 0)//置零当前线程所在的结点状态,允许失败。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一个需要唤醒的结点s
    if (s == null || s.waitStatus > 0) {//如果为空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
            if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}

总的来说,这个函数用unpark()方法去唤醒等待队列里面没有放弃的线程

答疑

什么是volatile?

我们刚刚提到,这个state字段是用volatile修饰的。

Java内存模型里规定了,所有的变量都存储在主内存中,每条线程都有自己的工作内存,线程的工作内存中保留了被该线程用到的变量(这些变量拷贝自主存),线程间变量的传递需要由主存传递

这就好像,你和你的小伙伴被关在了不同的小黑屋里,这些小黑屋外面有个人看守,你和其它小伙伴需要通过这个看守人来通信。

笔者觉得这个思想也是借鉴了操作系统里线程通信的理念。

回到正题,什么是volatile?

举个例子:现在有两个线程。

第一个线程执行两个步骤:

int i = 0;
i = 10

第二个线程执行

j=i

由上述讲到的java内存模型,我们得知:第一个线程从主存中取得i的变量,写入工作内存,将 i 写成10.

然而,还没等第一个线程把 i 写回主存,主存中的 i 仍然是10,第二个线程就把这个i拿走了,那么最后结果 y = i = 0

这个过程,不难发现,j对于i的修改是不可见的。为了确保变量的可见性,我们需要volatile

volatile所修饰的变量能保证,所修改的变量回立刻更新到主存,感兴趣的同学可以去深入了解。

什么是CAS自旋 volatile 变量?

拿JUC包里的AtomicInteger类里面的常用的getAndIncrement方法举例:

public final int getAndIncrement() {
            for (;;) {
                  int current = get();  // 取得AtomicInteger里存储的数值
                  int next = current + 1;  // 加1
                  if (compareAndSet(current, next))   // 调用compareAndSet执行原子更新操作
                     return current;
            }
}

这里我们看到了compareAndSet方法(CAS)。

我们点进这个方法的源码,发现,compareAndSet()方法中调用的是sun.misc.Unsafe.compareAndSwapInt(Object obj, long valueOffset, int expect, int update)方法,compareAndSwapInt 是用native修饰的,说明它是基于的是CPU 的 原语,性能好

它使用的是基于冲突检测的乐观并发策略。我们都知道,乐观并发策略在线程数大的时候失败的概率也会上升。

什么是park()和unpark()?

park()和unPark()是LockSupport类里面的方法

//暂停当前线程
LockSupport.park();

//恢复某个线程
LockSupport.unpark(暂停线程对象);

举个例子就懂啦

Thread thread = new Thread(() -> {
      System.out.println("start.....");
      try {
            Thread.sleep(1000);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
      System.out.println("park....");
      LockSupport.park();
      System.out.println("resume.....");

});
thread.start();
Thread.sleep(2000);
System.out.println("unpark....");
LockSupport.unpark(thread);

这段代码先开启一个线程,然后休眠,让主线程先暂停,之后休眠后,再让开启的线程暂停。

运行结果:

start.....
park....
unpark....
resume.....

参考文章:

www.cnblogs.com/waterystone…

www.cnblogs.com/shoshana-ko…

blog.csdn.net/lzh75441356…

javaguide.cn/java/concur…

juejin.cn/book/711642…

cloud.tencent.com/developer/a…

blog.csdn.net/lzh75441356…