阅读 728

初步了解AQS是什么(一)

了解AQS

简介

AbstractQueueSynchronized的缩写,也叫抽象的队列式同步器。定义了一套多线程访问共享资源的同步器框架。

字如其名,他是一个抽象类,所以大部分同步类都是继承于它,然后重写部分方法即可。

比如说ReentrantLock/Sema

phore/CountDownLatch都是AQS的具体实现类。

功能

AQS维护了一个共享资源State和一个FIFO的等待队列,当有多个线程争抢资源的时候就会阻塞进入此队列。

线程在争抢State这个共享资源的时候,会被封装成一个Node节点,也就是说在AQS的等待队列里面的元素都是Node类型的对象。

PS:阻塞队列中,不包括Head节点。

在了解AQS之前,我们先了解下Node内部是怎样的,我们先来看看源码

static final class Node {
    // 标识节点当前在共享模式下
    static final Node SHARED = new Node();
    // 标识节点当前在独占模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的几个int常量是给waitStatus用的 ===========
    // 代码此线程取消了争抢这个锁
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
    //通俗的话来说就是,如果A节点被设置为SIGNAL,假如B是A的后继节点,那么B需要依赖A节点来唤醒才能拿到锁
    static final int SIGNAL    = -1;
   
    // 本文不分析condition,所以略过吧,下一篇文章会介绍这个
    static final int CONDITION = -2;
    
    // 同样的不分析,略过吧
    static final int PROPAGATE = -3;
    // =====================================================


    // 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
    // 这么理解,暂时只需要知道如果这个值大于0代表此线程取消了等待,
    // ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的
    volatile int waitStatus;
    // 前驱节点的引用
    volatile Node prev;
    // 后继节点的引用
    volatile Node next;
    // 这个就是本线程
    volatile Thread thread;

}
复制代码

结构

//头结点,当前持有锁的线程
private transient volatile Node head;

//尾节点,每次有新的节点进来,都要放在尾节点后面
private transient volatile Node tail;

//当前锁的状态,值为0的时候,表示共享资源没有被占用,1的时候表示有一个线程占用,如果大于1则表示重入
private volatile int state;

// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; 
复制代码

因为AQS只是提供了一个模板,那么具体资源的获取方式和释放方式就由具体的实现类来决定。

下面我们跟着看源码一起看看AQS到底是什么东西

对于State的访问,AQS定义了以下3种方式

  1. getState()
  2. setState()
  3. compareAndSetState()

AQS定义了两种访问资源的方式

  1. 独占模式 ,也就是说只有一个线程可以访问资源,如ReentranctLock
  2. 共享模式,表示可以有多个线程访问资源,如Semaphore/CountDownLatch

上面我们说过,具体的同步实现器就是实现资源state的获取和释放的方式就好了,关于队列的维护,Node节点的入队出队或者获取资源失败等操作,AQS已经实现好

自定义的同步器只要实现以下方法,就可以实现出不同的同步器

  • 独占模式
    1. tryAcquire(int),尝试获取资源,获取成功的话返回true,否则false
    2. tryRealease(int),尝试释放资源,释放成功的话返回true,否则false
  • 共享模式
    1. tryAcquireShared(int),尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    2. tryReleaseShared(int),尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

PS:以上的方法在AQS上是没有实现的,只有在具体的同步类实现器才会实现。

独占模式的源码分析

以独占模式的ReentractLock的公平锁为例子

加锁

其实在每个具体的同步类(独占模式)的操作资源的接口中,最终调用的是AQS的acquire方法(比如说ReentractLock的公平锁)

所以我们看acquire的方法具体是怎么实现,至于其他不同的同步器的方法调用,也差不多都理解了。

acquire的源码

关于解释都在代码里面了

 public final void acquire(int arg) {//arg = 1,表示同步器想要1个state资源
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
// tryAcquire,顾名思义,就是先尝试获取一下,如果获取成功,就返回true,那么获取资源也就结束了,否则,就把当前线程设置为独占模式(EXCLUSIVE),压到阻塞队列中。
// addWaiter就是把当前线程封装成Node对象,并且设置为独占模式(EXCLUSIVE),加入阻塞队列


复制代码

下面继续看tryAcquire的源码

tryAcquire的源码

注意:这里用ReentranctLock只是为了方便举例子,不同的同步器实现不同的方法而已.

 protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
     		//获取资源
            int c = getState();
     		//如果c为0,说明资源没有线程占用,则可以去抢
            if (c == 0) {
                //既然是公平锁,那么肯定就讲究先来后到
                //hasQueuedPredecessors先看看前面有没有Node节点在等待,如果没有,就通过CAS去获取一下
                //在这里存在着线程竞争,所以有可能成功有可能失败,如果成功获得资源,那么compareAndSetState返回true,否则false
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    //到这里说明前面没有线程在等待并且成功抢占到临界资源
                    //所以就设置当前线程为占有资源的线程,方便后面判断重入
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                //到这里说明是重入的问题
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
     		//到这里就是临界资源被占用,而且不是重入的情况,也就是说head节点都还没释放资源!
            return false;
        }
复制代码

下面继续看addWaiter的源码和acquireQueued的源码

addWaiter源码

 private Node addWaiter(Node mode) {//传入的是Node.EXCLUSIVE
     	//将当前线程封装成Node对象,并设置为独占模式
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
     	//找到尾节点
        Node pred = tail;
     	//如果找到队尾,说明队列不为空(如果只有head,其实队列式为空)
        if (pred != null) {
            //把队尾设置为插入节点的前缀节点
            node.prev = pred;
            //通过CAS操作,将传入的线程放到队尾,这里用CAS操作,是因为此时可能会有多个线程插入队尾,所以在此时队尾元素是不太确定的
            if (compareAndSetTail(pred, node)) {
                //进入这里,说明当前队尾元素就是当前线程,设置前缀节点就好了!
                pred.next = node;
                return node;
            }
        }
     	//如果代码执行到这一步,说明有两种情况
     	//1. 现在队列为空
     	//2. 将当前线程代表的节点插入队列的时候,有其他线程也要插入该队列并且成功成为队尾元素.
        enq(node);
        return node;
    }


/*enq函数------------------分界线------------------------------------*/

 private Node enq(final Node node) {//传入的是当前线程所代表的节点
        for (;;) {//自旋
            Node t = tail; //找到队尾元素
            if (t == null) { // Must initialize
                //到这里代表队列为空,那么通过CAS操作加入头结点,此时还是可能有多个线程会跑到这里竞争
                if (compareAndSetHead(new Node()))
                    /*
                    到这里说明当前线程设置head成功(竞争成功),注意,这个head是直接新建的,此时waitStatus == 0(到后面会说)	
                    虽然设置了head,tail还是null,所设置一下tail,让它不为null,方便下次for循环执行else语句从而进行将当前线程代表的节点设置在head后面,自己跟着思路走一下。
                    */
                    tail = head;
            } else {
                /*
                到达这里也要分下情况
                1. 	是addWaiter想把当前线程加入队尾失败的时候
                2.  是上个if语句设置head节点成功之后,下一次for循环了
                不过上面的两种情况,都是想要把当前线程设置为队尾节点,也是通过CAS操作。因为此时也是有多个线程竞争的,如果成功就设置成功,如果失败就自旋操作,不断地尝试设置为队尾节点。
                */
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

复制代码

通过上面的简要分析,我们知道addWaiter最终的结果就是返回一个插入队尾或者head后面的节点。接下来acquireQueued就是插入队列的线程进行等待,如果轮到这个线程去拿资源了就去拿。相当于在饭堂排队打菜一样。一个窗口只能为一位学生打菜,没轮到打菜的同学可以休息做其他事情。

acquireQueued源码

解释一下:如果acquireQueued函数返回true,则表示会进入中断处理,不会进行挂起,也就是说打菜的同学不会休息,所以一般是返回false的

 final boolean acquireQueued(final Node node, int arg) {//传入的是当前已经加入队尾的节点和想要获取的State
        boolean failed = true;
        try {
            boolean interrupted = false;//默认设置没有中断
            for (;;) {//这里也是自旋
                //获取传入这个节点的前驱节点
                final Node p = node.predecessor();
                
                /*
                tryAcquire
                如果p是head,也就是说当前这个线程的节点是阻塞队列的第一个节点,那么就去尝试获取state,毕竟这个是队列嘛,先来先到。有可能成功,也有可能失败。
                因为head节点表示的是当前正在拥有资源的线程,不知道能否成功是因为不知道head节点有没有释放资源,其实在ReentractLock的tryAcquire就是判断state是否为0,如果为0,则表示没有线程拥有该资源,也就是说head节点释放了该资源,那么即可获取。
                还有一个原因就是在enq的时候,如果队列没有节点,也就是初始化head节点的时候,没有设置任何线程,也就是说head没有占用资源,那么当前线程作为阻塞队列的对头,可以去尝试去获取state,万一得了呢?!
                */
                if (p == head && tryAcquire(arg)) {
                   //到这里是当前线程获取state成功,将当前节点设置为head节点,
                    setHead(node);
                    p.next = null; // help GC,让之前的head方便被JVM回收
                    failed = false;//表示获取state成功
                    return interrupted;//表示期间有没有被中断过
                }
                //到这里是说明 要么代表当前线程的节点不是阻塞队列的头结点  要么尝试获取state资源失败
                //不管是哪种情况,说明当前加入节点的线程想要知道自己此时的状态是什么,若是休息,但是谁告诉我下一次到我了?若不是休息,那么就找到可以休息的地方或者说到我打菜了。所以这里就用了waitStatus的变量表示
                //
                /*
                如果有A Node对象,直接排在A后面,队列是这样的 A<=>B<=>C
                1. 如果A的waitStatus =  -1 ,表示说A 如果占用了state资源,那么排队在A后面的第一个Node节点(B节点)可以先休息(B线程挂起)了,如果A释放了资源那么就会唤醒B,也就是A对B说,你先去休息吧,我好了就叫你
                2 .如果 A的waitStatu = 1,表示说A这个线程已经不想排队获取这个资源了,这里设置这个值主要是方便当前节点找到可以让它可以他安心休息的地方。
                3. waitStatu = 0 表示A是初始化状态
                */
                //这里是 主要是为了找到node可以休息的地方。如果找到就休息,如果找不到,那么说明node前面就是head了,下一次循环检查能不能获取资源就好了!
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


//---------------setHead源码-------------------
private void setHead(Node node) {
    	//将head指针指向传入的节点
        head = node;
    	//这里设置head节点的线程为null,同步实现器在实现tryAcquire成功的时候会把当前线程保存下来的
        node.thread = null;
    	//这里是当前node的前缀
        node.prev = null;
   }



//---------------
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//传入的是前驱节点和当前节点
     	//获取前驱节点的状态,方便当前节点的接下来的状态
        int ws = pred.waitStatus;
     	
        if (ws == Node.SIGNAL)
            //进到这里就说明waitStatus = -1,也就说明node应该可以休息了,也就是线程挂起
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            //这里就是说前驱节点已经放弃排队了,当前节点往前找看看能不能找到没有取消排队的节点,看看能不能排在他们后面
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //到这里很有可能是waitStatus为0的分支,上面我们都没有设置waitStatus
            // 我们在enq的时候用了new Node()和在addWaiter刚开始的时候也是用了 new Node(mode,arg),这两处都是添加tail的时候
            // 如果没有设置waitStatus的时候,是默认为0的,也就是说是初始化状态
            // 如果到达这里前驱节点都是tail,我们就要将队尾的状态设置为-1,让传进来的node节点可以找到休息点。或者是已经释放资源的head,那么下次node可以变为head了!!
            // 设置可能会失败,因为这里也会有线程竞争,竞争不过,这里也是通过自旋,直到能找到休息点为止。
            //也有可能是pre节点已经是head节点了,还没有释放state资源,此时pre(head)的waitStatus == -1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
     	//这里为什么返回false呢,是因为如果这时候head已经是node的pre,那么如果到这里head已经释放完资源之后,node下一次就可以直接获取资源啦!如果head还没释放资源,那么下一次node就直接去休息。
     	// 如果返回的是true的话,如果这时候head已经是node的pre,head已经释放完资源,那么到后面线程节点就挂起,那么谁来唤醒node节点?
        return false;
    }


//--------------------------
 private final boolean parkAndCheckInterrupt() {
     //如果shouldParkAfterFailedAcquire返回的是true,若是false则不会执行到这里!
        LockSupport.park(this);//线程从这里挂起,如果被唤醒和中断那么继续从这里往下执行。
        return Thread.interrupted();
    }
复制代码

解锁

从上面的解释我们知道,如果挂起等待的线程需要获取资源,是需要前缀节点的waitStatus为SIGNAL的线程唤醒的,也就是head节点。

在独占模式中,具体的同步器实现类最终用到的是AQS的release方法,开始的时候说过了,具体的同步实现器只要实现tryRelease方法即可。

比如说ReentranctLock的unlock

release的源码

 public final boolean release(int arg) {
        if (tryRelease(arg)) {//这里是先尝试释放一下资源,一般都可以释放成功,除了多次重入但只释放一次的情况。
            Node h = head;
            //这里判断的是 阻塞队列是否还存在和head节点是否是tail节点,因为之前说过,队列的尾节点的waitStatus是为0的
            if (h != null && h.waitStatus != 0)
                //到这里就说明head节点已经释放成功啦,就先去叫醒后面的直接节点去抢资源吧
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
复制代码

tryRelease源码

protected final boolean tryRelease(int releases) {
    		//对state的操作就是释放资源
            int c = getState() - releases;
    		//如果执行释放操作的不是所拥有资源的线程,抛出异常。
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
    		//判断是不是有嵌套锁
            if (c == 0) {
                //如果到达这里,说明临界资源已经获得自由啦,没有线程占用它啦!所以设置free = true
                free = true;
                //同时会把拥有资源的线程设置为null
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
复制代码

unparkSuccessor源码

private void unparkSuccessor(Node node) {//传入的是head节点
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
    	//这里设置一下head的waitStatus,因为之前除了有节点加入队列的时候会把head节点ws = -1,基本没有其他地方设置,所以这里基本都是为-1的,CAS设置为0主要是head后面的直接节点不会挂起等待。
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
    	//下面的代码是如果阻塞队列有取消等待的节点,那么就把他们移除阻塞队伍,找到真正想要获取资源在等待的head后面的直接节点。
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //到这里就说明找到了那个节点,也就是head后面的第一个没有取消等待的节点,这个节点可能已经挂起或者还在挂起的过程中,反正都会执行唤醒线程的函数。这样如果是挂起的线程,就继续执行下一次自旋,下一次自旋肯定拿到锁,进行操作。因为已经满足了是1. 唤醒的节点是阻塞队列的第一个节点,2. head节点已经释放资源了!
            LockSupport.unpark(s.thread);
    }

复制代码

总结

  1. 本篇文章简述了AQS的大概作用和原理。
  2. 以ReentrantLock(独占模式)的公平锁为例子,分析了AQS的关于阻塞队列是怎么的操作。
  3. 写这篇文章主要是为了加强自己的关于多线程的基础。为了学习,看了许多篇大佬的博客文章,记录下来,加入了自己的理解,希望错误少一点。得以这些大佬为榜样,站在巨人的肩膀去学习,希望能够学得更快,更高效,希望以后也可以写出自己的高品质文章!

参考

  1. www.javadoop.com/post/Abstra…

  2. www.cnblogs.com/waterystone…

关注下面的标签,发现更多相似文章
评论