JUC解析-AQS(1)

2,634 阅读11分钟

AQS是AbstractQueuedSynchronizer的简称,翻译过来就是抽象队列同步器,从字面意思上理解:

  • 抽象:提供了一种定义的方式,但是不包括具体的实现
  • 队列:使用了基于队列的方式实现
  • 同步:实现了同步的功能

其实关于AQS的解析,在google上有很多的分析文章,既有侧重源码分析的,也有侧重原理的,在这里我想从一个其他的角度来试着分析下这个概念。

同步器的设计

在具体分析之前,我们先解释两种同步的方式,独占模式共享模式

  • 独占模式:资源是独占的,一次只能一个线程获取。
  • 共享模式:同时可以被多个线程获取,具体的资源的个数可以通过参数指定。

如果我们自己实现一个同步器的框架,我们怎么设计呢?下面可能是我们想到的比较通用的设计方案(独占模式):

  1. 定义一个变量int state=0,使用这个变量表示被获取的资源的数量。
  2. 线程在获取资源前要先检查state的状态,如果为0,则修改为1,表示获取资源成功,否则表示资源已经被其他线程占用,此时线程要堵塞以等待其他线程释放资源。
  3. 为了能使得资源释放后找到那些为了等待资源而堵塞的线程,我们把这些线程保存在FIFO队列中。
  4. 当占有资源的线程释放掉资源后,可以从队列中唤醒一个堵塞的线程,由于此时资源已经释放,因此这个被唤醒的线程可以获取资源并且执行。

其实上面也是AQS的设计方案,在这里我们就按照上面的几点来详细的介绍下AQS是怎么做的,我们先介绍下独占模式下的具体实现

AQS的数据成员

在AQS中定义的成员变量:

  1. state 作用与1条中的变量相同。
  2. headtail,定义了上面第4条中的FIFO队列,head和tail分别指向队列的头部和尾部,这里的队列是一个双向的队列。
  3. 定义Node节点,队列中每个元素都是一个Node节点,节点中包含了堵塞的线程以及前驱和后继节点的指针。
  static final class Node {
      //标记一个结点(对应的线程)在共享模式下等待
      static final Node SHARED = new Node();
      // 标记一个结点(对应的线程)在独占模式下等待
      static final Node EXCLUSIVE = null; 
      
      // waitStatus的值,表示该结点(对应的线程)已被取消
      static final int CANCELLED = 1; 
      //waitStatus的值,表示后继结点(对应的线程)需要被唤醒
      static final int SIGNAL = -1;
      //waitStatus的值,表示该结点(对应的线程)在等待某一条件
      static final int CONDITION = -2;
      /*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/
      static final int PROPAGATE = -3;
      
       // 等待状态,取值范围,-3,-2,-1,0,1
      volatile int waitStatus;
      volatile Node prev; // 前驱结点
      volatile Node next; // 后继结点
      volatile Thread thread; // 结点对应的线程
      Node nextWaiter; // 等待队列里下一个等待条件的结点

      //成员方法忽略,可以参考具体的源码
  }

说明:上面的数据结构的成员一定要理解,这个是理解其具体实现原理的基础。

获取资源

  //arg是要获取的资源的个数,在独占模式下始终为1
  public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
    }

说明:

  1. 首先尝试获取资源,调用tryAcquire,查看源码可以发现这个成员函数直接抛出了异常,没有具体的实现方式,这就是前面说的抽象的原因,对于不同的同步方式,获取资源的方式不同,即tryAcquire的实现不尽相同。
  2. 如果获取资源失败,则将这个线程的相关信息插入队列中,具体插入队列的实现是由addWaiter(Node.EXCLUSIVE)实现的,参数Node.EXCLUSIVE表示我们要插入的Node是独占式的,具体的实现:
   private Node addWaiter(Node mode) {
        //生成该线程对应的Node节点
        Node node = new Node(Thread.currentThread(), mode);
        //将Node插入队列中
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    //将Node节点插入到线程堵塞的队列中
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

上面的两个函数比较好理解,就是在队列的尾部插入新的Node节点,但是需要注意的是由于AQS中会存在多个线程同时争夺资源的情况,因此肯定会出现多个线程同时插入节点的操作,在这里是通过CAS自旋的方式保证了操作的线程安全性,需要说明的是通过这种方式多个线程同时插入节点,不能保证节点的顺序。

  1. 对队列进行调整,调整的原因是因为有些线程可能临时的取消而退出,这样在队列中间会穿插着很多无效的线程节点,因此可以在2步骤操作结束后,对队列进行调整,使得新插入的节点能排在有效的节点的后面,具体的实现:
   final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //线程唤醒后将接着从这里开始执行
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

说明:

  • 对于新插入的节点,如果他的前驱节点是head节点,那可以尝试再次tryAcquire去获取资源,如果获取成功,则将这个节点设置为新的head节点,在这里需要理解,为什么开始获取资源失败,节点插入队列后再次获取资源就可以成功呢?因为队列是存在多线程操作的,第一次获取资源的时候,资源可能正在被其他的线程占有因此获取失败,但是在插入的过程中,资源被释放掉,这时候再去获取资源即可以成功,而且资源的获取总是从队列的头部节点开始依次往后面执行的,如果不满足上面的条件,则对这个节点进行调整,具体的逻辑:
   private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果有效,则设置为SIGNAL,保证在该节点获取到资源后能通知后续的节点
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
  • 从上面的逻辑可以看出,新插入的节点要调整到未取消的节点的后面(在前面Node节点的定义中,会发现如果线程取消,则对应节点的waitStatus会设置为CANCELLED,因此有效的节点的waitStatus < 0)。 节点的位置调整结束后,该线程将会陷入堵塞的状态,具体逻辑:
    private final boolean parkAndCheckInterrupt() {
        //该函数调用了unsafe.park函数堵塞,在前面的系列文章中有对unsafe介绍过
        LockSupport.park(this);
        return Thread.interrupted();
    }

到此,一个线程获取资源的逻辑介绍完了,下面我们介绍下资源的释放过程

资源释放

  public final boolean release(int arg) {
      if (tryRelease(arg)) {
        Node h = head;
        // waitStatus等于0,说明后面没有要释放的线程结点
        if (h != null && h.waitStatus != 0) 
          // 唤醒等待队列里下一个结点对应的线程
          unparkSuccessor(h);
        return true; // 成功
      }
    return false; // 失败
  }
  1. 与资源的获取类似,tryRelease也没有具体的实现,对于不同的同步方式,可以有不同的实现,这里,如果资源释放成功,则唤醒队列中的下一个节点。
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
  1. 如果node节点为null或者已经被取消,则从后往前遍历队列,找到最前面的一个有效的节点,然后调用unpark唤醒,这里之所以node节点会变为null,也是因为多线程操作的原因。

AQS类实现其实是一个无锁的线程安全类实现,在无锁的情况下,主要是通过CAS和自旋的方式实现的线程安全,并且在代码中会有很多的条件判断,这也是因为在多线程的情况下每个数据都有可能发生变化,在阅读代码的时候要把握这点!!

线程被唤醒后,会接着前面分析的acquireQueued函数中睡眠的地方执行自旋判断,如果条件成立则获取资源成功,并且重置head节点,此时一个资源的释放操作完成。

共享模式介绍

共享模式与独占模式比较类似,不同的是资源不是唯一的,因此可以被多个线程获取,由于不同的线程获取资源的个数不同,因此在线程的唤醒逻辑上,可能会存在一个线程释放资源,但是会唤醒多个等待线程的情况,举例如下:

存在3个资源,4个线程,其中线程1占用了3个资源,线程2需要两个资源,线程3和线程4分别需要1个资源,但是由于资源不足,因此线程2、3、4分别堵塞在队列中,当线程1资源释放后,会有三个空闲的资源,因此会有两个其他的线程会按照队列中的顺序依次获取到资源,即一个线程释放资源,多个线程获取资源。

前面都比较类似,不同的逻辑如下:

    private void doAcquireShared(int arg) {
        //生成共享节点并且插入队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //返回值代表剩余的资源数量
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //设置新的头结点,并且可能会继续唤醒后继的节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //如果条件不满足,则调整节点在队列中的位置并且睡眠
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

说明:如果节点是head的后继节点,则尝试获取资源,这个和独占模式是相同的,不同的由于共享模式的资源数量不唯一,节点获取资源后可能还有剩余的资源,如果有则进一步的唤醒后继的节点,具体逻辑如下:

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        //propagate表示剩余的资源,如果>0表示会尝试继续唤醒后继的节点
        if (propagate > 0 || h == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

说明: 在有剩余的资源或者原头结点满足条件的情况下,会触发后继的节点释放操作。

    private void doReleaseShared() {
        for (; ; ) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    unparkSuccessor(h);
                //在没有新节点插入的情况下,该节点的默认状态为0,此时将状态设置为PROPAGATE主要是保留了后继节点唤醒的状态,等新节点接入的时候继续唤醒后继节点
                } else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            if (h == head)
                break;
        }
    }

说明:在一个线程获取到资源成功并且还有剩余资源的情况下,该线程将尝试接着调用setHeadAndPropagate释放其他的线程,有一点需要特别说明下:

在doReleaseShared中,要想唤醒后继的节点,必须保证head节点的waitStatus为Node.SIGNAL,这个是怎么保证的呢?其实不需要保证的,当一个节点调用setHeadAndPropagate成为新的head节点的时候,默认的waitStatus为0,此时如果有新的节点插入队列,则该头节点状态会被修改为Node.SIGNAL,满足唤醒的条件。如果没有新节点接入,在doReleaseShared中会把head节点的waitStatus设置为Node.PROPAGATE,此后如果新节点插入后,对队列调整shouldParkAfterFailedAcquire会失败,会触发新的一轮检查,此时发现其前驱节点已经是head节点了,这时候就可以获取资源了,这个也就是引入Node.PROPAGATE标志的原因,这里比较抽象,需要好好理解下~

到此为止,AQS关于资源获取和释放的部分已经分析结束,此外在类中还有Condition相关的部分,本来打算在一篇文章中写完,现在发现文章有点太长了,读起来可能比较吃力,因此Condition的部分就放在第二部分分析吧。