阅读 21

初步了解AQS是什么(三)

前言

  1. 第一篇文章通过ReentranctLock的公平锁介绍了AQS的核心部分,第二篇文章通过Condition分析了独占锁是的具体的工作机制,还有公平锁和非公平锁的区别,也分析了中断机制在AQS的实现
  2. 本篇文章主要了解的是AQS的共享模式,有些内容需要前面两篇文章的内容,所以阅读之前可以先阅读之前的文章,这样效果更佳!
  3. 文章链接:<初步了解什么是AQS(一)><初步了解什么是AQS(二)>
  4. 本文章仅供个人学习用,希望大家可以互帮互助!

共享锁和独占锁的区别

独占锁

  1. 独占的,排他的
  2. 当某个线程拥有独占锁,其他线程只能等待这个线程释放这个独占锁才能去争取,并且同一时刻只能有一个线程可以争取独占锁成功

共享锁

  1. 锁是共享的,可以被多个线程同时拥有
  2. 如果一个线程拥有了这个锁,那么其他等待这个共享锁的线程可以去尝试获取锁,并且有很大几率获取成功

方法区别

独占锁 共享锁
tryAcquire(int arg) tryAcquireShared(int arg)
tryAcquireNanos(int arg, long nanosTimeout) tryAcquireSharedNanos(int arg, long nanosTimeout)
acquire(int arg) acquireShared(int arg)
acquireQueued(final Node node, int arg) doAcquireShared
acquireInterruptibly(int arg) acquireSharedInterruptibly(int arg)
doAcquireNanos(int arg, long nanosTimeout) doAcquireSharedNanos(int arg, long nanosTimeout)
release(int arg) releaseShared(int arg)
tryRelease(int arg) tryReleaseShared(int arg)
- doReleaseShared()

由表格我们知道,除了最后一个doReleased()是共享锁独有之外,其他的方法独占锁和共享锁基本都是一一对应的。

在独占锁中,release方法中尝试释放锁,如果成功就unparkSuccessor(h)

在共享锁中,releaseShared方法中尝试释放锁,如果成功就doReleaseShared()

所以一般来说unparkSuccessor(h)doReleaseShared()一般是互相对应的,但是doReleaseShare()要执行的逻辑比前者多。这点我们到后面再看!

这里提一下:

我们之前说过

  • 在独占模式中,当一个线程获取锁之后,只有释放锁之后才会去唤醒下一个线程。

  • 在共享模式中,如果一个线程成功获取了共享锁,他就会马上唤醒下一个等待的线程,并不需要该拥有锁的线程释放锁才唤醒下一个线程,因为共享锁的设定就是为了让多个线程同时拥有所资源的!这里有两种情况会唤醒后面等待的线程 1. 当前线程成功拥有锁 , 2. 拥有锁的线程释放锁 . 希望读者记住这两点,这对于后面我们分析源码的时候有指导性的作用。

前面我们先说了独占锁和共享锁的区别,是为了让读者更好地去区分它们。下面我们就通过分析CountDownLatch的源码,去体会AQS中共享模式是如何工作的。

CountDownLatch

说明

CountDownLatch类是典型的AQS的共享模式使用,并且是一个高频使用的类。

关于CountDownLatch的用法,我们就不一一在这里赘述。如果有不了解的读者,可以先去百度一下再往下看源码分析,这样效果会更佳。

这里推荐一篇关于怎么用的博客,个人看的时候觉得比较通俗易懂

www.cnblogs.com/cuglkb/p/85…


源码分析

构造方法

需要传入一个不小于0的数,其实也就是设置AQS的共享资源

  public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }


//----------Sync是CountDownLatch的内部类
/**
我们这里说一下,我们看到Sync就是继承了AQS这个抽象类,然后重写了tryAcquireShared和
tryReleaseShared的方法,也就是自己加了个getCount()方法,所以后面看到Sync类调用的方法,除了重写那两个,其他的基本都是父类的方法,读者要时刻记住这一点!
**/
  private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count); //AQS
        }

        int getCount() {
            return getState();
        }

      	//重写AQS的方法
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
		//重写AQS的方法
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
          
复制代码

我们可以看到,在构造方法里面我们就设置了AQS的state值。

所有调用了await()方法的线程就会等待挂起,然后另外的线程就会执行state = state -1 的操作。

当state的值为0的时候,那么将state减为0的线程就会马上唤醒之前调用了await()的线程,然后这些线程就会去获取共享锁啦!

在我们知道CountDownLatch的用法之后,我们需要注意它的两个方法,一个是CountDown(),另外一个是await()方法

countDown()每次会把state的值减1,直到state的值变为0,唤醒调用await()的线程

await()顾名思义,调用这个方法的线程会阻塞,直到被唤醒。

上面说的希望读者可以认真理解一下,理解到位了看下面的源码才不会半知半懂

示例代码

我们通过下面的源码,来分析countDownawait的源码

public static void main(String[] args) {
       CountDownLatch latch = new CountDownLatch(2);
    //t1和t2负责countDown,也就是将state减1
       Thread t1 = new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   Thread.sleep(5000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               latch.countDown();
           }
       },"t1");

       Thread t2 = new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   Thread.sleep(7000);

               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               latch.countDown();
           }
       },"t2");

        t1.start();
        t2.start();

    // t3 和 t4负责await,等待state为0然后被唤醒
        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    latch.await();
                    System.out.println(Thread.currentThread().getName() + "从await回来了");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            }
        },"t3");

        Thread t4 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "从await回来了");
            }
        },"t4");

        t3.start();
        t4.start();
    }
复制代码

结果(t3和t4不是按照绝对的顺序输出的,后面的分析我们按照t3先入队)


下面我们就按照步骤来

latch先await()阻塞,等待被唤醒(等待state变为0),然后await()返回去做其他事情!

await源码

public void await() throws InterruptedException {
     	//调用sync的方法,关于Sync我们前面已经说过,这里就不多赘述了
        sync.acquireSharedInterruptibly(1);
    }


public final void acquireSharedInterruptibly(int arg) //AQS
            throws InterruptedException {
    	//这个方法是响应中断的有中断就抛出异常呗
        if (Thread.interrupted())
            throw new InterruptedException();
    	//t3和t4到这里的时候,肯定是小于0的(此时state = 2)
    	//所以先看看tryAcquireShared方法
    	if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }


protected int tryAcquireShared(int acquires) { //重写AQS的方法
        //只有state>0的时候才会返回1
    	return (getState() == 0) ? 1 : -1;
        }

//从这个方法名我们知道,这个方法是获取共享锁并且是响应中断的
private void doAcquireSharedInterruptibly(int arg) //AQS
        throws InterruptedException {
    
    	//步骤1. 将当前节点设置为共享模式,然后加入阻塞队列!
    	//关于这些知识,在第一篇和第二篇文章就说过啦,不懂的回去看看吧!
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //这样也是一样,只要state > 0,就返回-1,(此时state = 2)
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //步骤2. 因为是刚开始,所以不会进到前面那个if语句,所以会直接来到这
                //这里我们都很熟悉了,就是在阻塞队列找到一个地方让这个线程可以挂起!
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

我们把上面的步骤用图来描述一下大家就清楚啦!

t3经过步骤1之后,也就是入队之后,会变成如下图所示

然后tryAcquiredShared会返回-1,那么就执行shouldParkAfterFailedAcquire就会将t3的pre指向的节点的WaitStatus(下面统称为ws)置为-1,如下图所示。(t3的ws为-1)

执行完shouldParkAfterFailedAcquire之后执行parkAndCheckInterrupt就会把t3挂起啦!

然后后面t4进来的时候和t3是一样的步骤,到后面就是t4加入阻塞队列,然后把t3的ws置为-1,结果如下图所示。(t4的ws为-1)

接着,t4就就被挂起啦,现在t3和t4就已经全挂起了,就等待着被唤醒啦!


这里说明一下,因为可以是由多个线程调用countDown的,那么有些线程调用的时候state还不是0,所以这些线程不会唤醒await的线程,只有当某个线程调用了countDown方法,然后state从1变为0了,那么这个线程就唤醒其他await的线程!明白这点很重要。

接下来我们继续看countDown方法

countDown源码

public void countDown() {
        sync.releaseShared(1);
    }

public final boolean releaseShared(int arg) { // AQS
    	//只有将state设置为0的时候才会返回true,否则只是每次将state-1而已
        if (tryReleaseShared(arg)) {
            //到这里说明已经成功把state置为0了,那么就开始唤醒现在还在等待的线程啦!
            doReleaseShared();
            return true;
        }
    	//state本来就是0的话,肯定释放不了,就返回false啦
        return false;
    }

protected boolean tryReleaseShared(int releases) {// 重写AQS
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0) 
                    return false;
                int nextc = c-1;
                //每次都是CAS将state-1
                if (compareAndSetState(c, nextc))
                    //如果state减1成功变为0的话,肯定就返回true啦!
                    return nextc == 0;
            }
        }


复制代码

下面我们来着重分析下doReleaseShared()这个方法

//此时state为0,先把流程走一遍先,先看注释即可,其他先不看
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //t3入队的时候,已经把head节点设置为-1啦,所以会到if语句里面
                if (ws == Node.SIGNAL) {
                    //将head设置为0,也就是恢复到初始状态
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //到这里就是唤醒head节点的下一个节点,这时候也就是唤醒t3
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
复制代码

在t3被唤醒之后,我们回到t3被唤醒的地方和接下来要做的事情

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {//此时head就是t3的前缀啦,所以可以进来这里
                    int r = tryAcquireShared(arg);
                    //这里r > 0啦,因为之前说过state == 0的时候才会返回1
                    if (r >= 0) {
                        //那么此时t3就走到这一步了,下面我们看下这个方法
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //步骤1:t3被唤醒之后就会从这里继续执行啦,假设没有中断的情况,那么是不会进到if语句里面的啦
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

此时t3会进入setHeadAndPropagate这个方法

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node); //步骤1:t3先把自己设置为头结点先
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                //基本都会执行这个函数,那么这里也就说明在共享模式中,当节点被唤醒
                //的时候都会执行这个函数,也就是会唤醒下一个节点
                //那么这里就是说t3把自己设置为头部之后,就会马上去唤醒t4去抢锁啦!
                doReleaseShared(); 
        }
    }
复制代码

那么在这里,我们就好好分析一下doReleaseShared()这个方法了,所以在这里我们知道,t3已经是head节点了

//调用这个方法的时候我们已经知道此事state为0了
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            /**
            h == null 说明阻塞队列已经为空
            h == tail 说明阻塞队列刚被初始化的头结点
            
            还有一种情况就是
            之前是普通的节点,但是此节点已经是头结点,那么就是说该节点是已经被唤醒了,			 后面阻塞队列已经没有节点了
            所以上面两种情况都不需要被唤醒
            */
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //ws == -1,因为之前t4已经把t3设置为-1(SIGNAL)啦
                if (ws == Node.SIGNAL) {
                    //问题1:这里如果将节点设置为初始化状态失败的原因待会解释!
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //到达这里,就说明t3的ws已经设置为0,然后t3就去唤醒t4啦!
                    //那么t4接下来被唤醒,就像t3被唤醒后做的事情差不多是一模一样,
                    //所以想分析t4接下来要干什么,看上面t3被唤醒之后做了什么就好了
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                   	//问题2:
                    //在我们举的例子中,t3是肯定不会来到这里的,这里主要判断的是
                    //如果有节点加入进来,但是这个节点准备到这里的时候把它的前驱节点
                    //就设置为-1了,关于这里,我们下面说明解释。
                    continue;                // loop on failed CAS
            }
            //此时h指向的还是t3那个节点,但是head节点可能已经变了
            //如果是同一个节点,退出循环,让被唤醒的线程去唤醒后面的线程。
            if (h == head)                   // loop if head changed
                break;
        }
    }
复制代码

下面我们来看一张图,就可以解释注释的内容了(希望大家可以好好看看这张图,方便理解后面的解释)

接着我们来解释一些内容

问题1

就是第一个CAS为什么会失败的,我们看到'唤醒cur的next节点'的流程,比如说t3唤醒t4之后,t3和t4此时就同时进行执行各自的代码了(这里我们把t3和cur对应,t4和next对应),所以在next被唤醒之后,就会把next节点设置为head节点,我们之前也说过,在setHeadAndPropagate里面有doReleased方法,所以会有下面的情况

  • cur节点唤醒next节点之后,cur由于某种原因阻塞了,next节点成功把自己设置为head节点,执行后面的操作,但是还没到第一个CAS.此时cur又继续往下走,判断此时的h已经不是执行head节点了,那么就会指向新的head,也就是此时的next节点.然后cur也继续往下走.所以现在cur和next这两个节点都是同时执行doReleased方法的,所以也会有可能同时到达第一个CAS,所以此时肯定只有一个线程CAS操作成功啦,另外一个不成功的就根据情况往下走了!这里举几个例子。

    比如说有t3->t4->t5->t6>......tn

    (1)把t3和t4分别对应前面说的那个cur和next,那么此时假设是t3唤醒t5,但t5还没有把自己设置为head节点,那么head节点仍然t4。又因为t4之前CAS是失败的,所以会自旋一次,然后执行最后一个if语句的时候,会直接退出啦!t3执行最后一个if的时候,由于t5还没有把自己设置为head,所以此时t3也退出啦。那么就说明t5以后的节点没人唤醒了吗?非也,因为t5成功设置自己为头结点的话,也会执行doReleased()方法啦!就由t5和它的后继去唤醒吧!

    在(1)的中,如果t3唤醒t5之后,t5也成功设置自己为头结点了,那么t5此时会可能会唤醒后面的,这里先不考虑。然后t3和t4又指向新的head节点啦。这种情况又开始循环啦。

    这里说明一下,t3和t4之中只能有一个CAS成功,另外一个就continue再次自旋啦,所以是t4唤醒t5,那么t3就像(1)说的t4一样啦。

我们看到在,在setHeadAndPropagate里面有doReleased方法,让后面的节点可以更快地唤醒,增大唤醒的吞吐量,不得不说设计者的厉害之处啊!

问题2:

在解释问题2之前,我们通过下面的步骤慢慢分析

假设这是某一情况阻塞队列的情况

  1. 然后t3被唤醒之后,此时阻塞队列可能没有节点了(阻塞队列不包含head节点)

  1. 我们在流程图里面说过,可能之前await的节点可能在addWaiter之前阻塞了,这时候可能又不阻塞了,然后执行了addWaiter方法,但是没有执行shouldParkAfterFailedAcquire,也就是t3没有把t2的ws设置为SIGNAL,所以就可能在执行if (h != null && h != tail)之前会有如下图所示

  1. doReleased函数中,我们已经把ws赋值为t3的ws,也就是ws为0,那么在2的前提下,我们来到这段代码else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)),我们很容易知道,ws为0,那么这次CAS有可能失败,为什么呢?在2 我们说过shouldParkAfterFailedAcquire可能还没执行,那么也有可能CAS之前,就已经执行了这个函数,也就是此时t3的ws为-1了,那么此时肯定CAS失败啦!如果失败的话,就让t3再回去检查下状态,看看是否可以唤醒后继节点啦,所以在else if之后就是让它continue啦!

  2. 如果CAS成功,那么就是说shouldParkAfterFailedAcquire这个函数还没有执行,所以在执行最后一个if语句的时候t3可能退出啦,那么t4继续执行shouldParkAfterFailedAcquire,CAS把t3设置为-1之后,就去执行到setHeadAndPropagate这个方法,那么也会执行doRelease,t4的使命就结束了!

经过上面4点的说明,我们对问题2的分析就已经很清楚啦!

扩展

在解决问题2的时候,我们如果把else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))代码去掉的话,会有什么影响么?分析一下!

假如没有这段代码,假设此时阻塞队列如下所示

假设这是t1唤醒t2,但t2还不是head节点,那么在t4入队的时候,因为我们假设没有那个else if语句嘛,所以会执行shouldParkAfterFailedAcquire,那么将t2的wsCAS为-1,此时head可能已经指向t2了,前面我们也说过,在共享模式中,只要一个节点成为头结点,就会执行doReleased方法,所以在t2设置为head节点的时候,可能在t4休眠之前就被t2park了,但是这个操作时可以允许的,当我们unpark一个并没有被park的线程时,该线程在下一次调用park方法时就不会被挂起,而这一行为是符合我们的场景的——因为当前的共享锁处于可获取的状态,后继的线程应该直接来获取锁,不应该被挂起。

CyclicBarrier

说明

下面我们来说下CyclicBarrier,和CountDownLatch差不多,CyclicBarrier 基于 Condition 来实现。所以如果没了解过Condition的话,建议大家看下前一篇文章初步了解AQS是什么(二)

啥都不说,开局一张图,(引用别人大佬画的图)

源码分析

下面我们开始分析咯,依然await是最重要的方法

我们先看下一些细节

public class CyclicBarrier {
    //CyclicBarrier 是可以重复使用的,每次从开始使用到穿过栅栏当做"一代",或者"一个周期"
    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
   
    //这里看出是基于Condition的,所以等待线程的条件就是所有线程都在栅栏上await啦
    private final Condition trip = lock.newCondition();
    
    //参与跨过线程的线程数
    private final int parties;
    
    /* 如果设置这个,那么跨过栅栏之前需要执行的操作*/
    private final Runnable barrierCommand;
    
    /** 当前所处的代 */
    private Generation generation = new Generation();
复制代码

下面我们看看如可开启新的一代吧

   private void nextGeneration() {
        // 需要唤醒所有等待在栅栏上的线程
        trip.signalAll();
        
        count = parties;
       	//开启新的一代!
        generation = new Generation();
    }
复制代码

基本可以看出,我们开启新的一点,差不多和重写生成一个CyclicBarrier差不多

我们接下来看下如何打破一个栅栏

 private void breakBarrier() {
    	//设置标志位
        generation.broken = true;
     	//重新设置count的值
        count = parties;
     	//唤醒所有在等待的线程
        trip.signalAll();
    }
复制代码

现在已经有了铺垫了,那么我们来看下await方法吧

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //这里我们先获得锁呗,在finally后面再释放锁
        //因为在Condition中,await的线程需要先获得锁的啦!
        lock.lock();
        try {
            final Generation g = generation;
			//先检查栅栏是否有被打破,被打破就抛出异常呗
            if (g.broken)
                throw new BrokenBarrierException();
			//检查中断,有则抛出
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
			//index 就是这个返回值
            //index 存储当前还有多少count
            int index = --count;
            //如果index 为 0,那么就说明所有线程在栅栏啦,那么就准备打破然后下一代啦
            if (index == 0) {  // tripped
                //用来标记barrierCommand在执行的时候有没有发生异常
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        //执行barrierCommand的操作
                        command.run();
                    ranAction = true;
                    //开启下一代啦,这里建议看下源码!
                    nextGeneration();
                    return 0;
                } finally {
                    //如果barrierCommand执行的时候发生过异常,那就打破栅栏呗!
                    //这里也建议看下源码!
                    if (!ranAction)
                        breakBarrier();
                }
            }

            //如果到达这里,说明上面的index不为0,救是说不是最后一个线程到达栅栏的
            for (;;) {
                try {
                    //这里是没有超时机制的
                    //到这里也就是说到达栅栏了,那么就等待呗,所以就调用Condition的
                    //await,等待最后一个线程,然后被唤醒(Condition的signal)
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //如果到达这里,就说明线程在await的时候被中断了
                    if (g == generation && ! g.broken) {
                        //到这里就打破栅栏呗
                        breakBarrier();
                        //打破栅栏之后,抛出异常信息,让外层自己去处理
                        throw ie;
                    } else {
                        //这里就是说g != genneration,就是说前面已经开启一个新的
                        //时代啦,说明最后一个线程await完成,所有线程就被唤醒啦!
                        //但是这个中断已经没有意义啦,所以记录下就好!
                        Thread.currentThread().interrupt();
                    }
                }
				//如果栅栏被打破,这里的被打破就是之前执行barrierCommand的时候
                //发生异常,那么被就抛出异常啦
                if (g.broken)
                    throw new BrokenBarrierException();

                //到这里基本都要退出啦
                //因为之前barrierCommand在执行完任务之后,就会 nextGeneration
                //开启一个新的时代,然后释放锁,等待的线程都是await到这里的,所以肯
                //定await之前的时代和被唤醒后的时代不一样啦!
                //如果之前的栅栏破了或者await的时候被唤醒了,都是在前面就抛出异常
                //都会直接返回啦!
                if (g != generation)
                    return index;
				//这里是超时的操作了,就不做分析了
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
复制代码

上面的await方法我们已经说得已经很清楚啦,我们接下来看看如何获取在栅栏等待的线程数

 public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //也就是运算一下就好啦!
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
复制代码

检查栅栏有没有被打破,其实也就是看看那个标志位有没有被设置就好啦!

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}
复制代码

下面我们来总结一下什么时候栅栏会被打破吧!

  1. 如果在await的线程被中断,那么就会打破栅栏,然后抛出InterruptedException的异常啦
  2. 如果在指定的操作被抛出异常的时候,也会打破栅栏!

我们再来看下重置栅栏的源码

 public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
复制代码

跟简单,就是打破栅栏,然后重建栅栏!也就是不破不立!

我们来假设一个情况,加入parties设置为4,此时有3个线程await了,那么在第4个await之前重置

那么首先打破栅栏嘛,那么就会跑出BrokenBarrierException 异常,然后开启新的一代,再重置CyclicBarrier 的count和generation,一切从0开始!

关于AQS的核心功能的源码分析就到此结束啦!继续征战下一个知识!

总结

  1. 本片博客重点分析了CountDownLatch的源码,在分析的过程中,有两个问题迷惑了自己很久,但是自己在尝试跳出局部思维之后,发现有些问题可以迎刃而解,也对AQS共享模式的工作机制有了一定的了解所以希望以后可以保持这种思维。
  2. 在分析CountDownLatch的源码过程中,自己起码可以耐下心去琢磨,希望以后可以更加锻炼阅读优秀源码的能力吧
  3. 在写本博客的时候参考了别人的想法和内容,希望自己可以早日独立写出优秀的文章!!!!!
关注下面的标签,发现更多相似文章
评论