Java并发——AbstractQueuedSynchronizer(AQS)同步器

1,676 阅读10分钟

简介

在此之前介绍ReentrantLockReentrantReadWriteLock中都有sync属性,而sync正是继承了AQS(AbstractQueuedSynchronizer)同步器。AQS采用模板设计模式,调用其模板方法(独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况),重写指定方法,我们自身就能利用AQS构造出自定义同步组件。

AQS解析

重要属性


    //等待队列的头节点
    private transient volatile Node head;
    //等待队列的尾节点
    private transient volatile Node tail;
    //同步状态
    private volatile int state;

AQS内部通过head、tail定义了一个FIFO队列,state表示同步状态(0表示未有线程获取同步状态或锁,大于0表示有线程占有),都通过volatile修饰,保证了内存可见性

重要内部类


static final class Node {
    /** 共享模式 */
    static final Node SHARED = new Node();
    /** 独占模式 */
    static final Node EXCLUSIVE = null;
    /**因为在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化 */
    static final int CANCELLED =  1;
    /** 
     * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者取消 
     * 将会通知后继节点,使后继节点的线程运行
     */ 
    static final int SIGNAL    = -1;
    /**
     * 节点在等待队列中,节点线程等待在Condtion上,当其他线程对Condtion调用了signal()方法后
     * 该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中 
     */
    static final int CONDITION = -2;
    /**  
     * 表示下一次共享式同步状态获取将会无条件被传播下去
     */
    static final int PROPAGATE = -3;
    /** 当前节点等待状态 */
    volatile int waitStatus;
    /** 前驱节点 */
    volatile Node prev;
    /** 后继节点 */
    volatile Node next;
    /** 节点关联的线程 */
    volatile Thread thread;
    /** 
     * 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量,即节点类型(独占和共享)
     * 和等待队列中的后继节点公用同一个字段
     */
    Node nextWaiter;
    }

node节点是构成同步队列的基础,pre、next前驱后继维护了一个双向队列,同步队列结构如图:

当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中

独占式获取与释放同步状态

  • 独占式获取同步状态
  • 独占锁的lock方法都会调用AQS所提供的模板方法acquire(),当线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出
    
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    模板主要思路:

    ①.调用tryAcquire()方法保证线程安全地获取同步状态(或者锁),此方法自定义同步器自己实现
    ②.若获取失败,则构造同步节点,调用addWaiter()将该节点加入同步队列尾部
    ③.最后调用acquireQueued()死循环获取同步状态
    ④.如果获取不到,调用shouldParkAfterFailedAcquire()方法判断是否需要阻塞,若返回true阻塞节点中的线程,可以依靠前驱节点的出队或阻塞线程被中断来唤醒阻塞线程

    tryAcquire

    tryAcquire()方法体内部只抛异常,若自定义同步器为独占式获取同步状态必须重写此方法

    
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    addWaiter

    将节点加入同步队列

    
        private Node addWaiter(Node mode) {
            //新建Node
            Node node = new Node(Thread.currentThread(), mode);
            // CAS快速尝试尾插节点
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //多次尝试
            enq(node);
            return node;
        }
    

    若队列为空或者cas设置失败后,调用enq自旋再次设置

    
        private Node enq(final Node node) {
            // 死循环
            for (;;) {
                // 获取尾结点
                Node t = tail;
                // 若队列为空,初始化
                if (t == null) { // Must initialize
                    // cas设置头节点
                    if (compareAndSetHead(new Node()))
                        // 设置尾结点
                        tail = head;
                } else {
                    // CAS设置尾结点
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    从源码中可以发现若同步队列添加节点失败后,会死循环一直尾插下去直至添加成功

    acquireQueued

    节点进入同步队列之后,就进入了一个自旋的过程,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则会一直执行下去

    
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                // 中断标记
                boolean interrupted = false;
                // 自旋死循环
                for (;;) {
                    // 获取当前节点的前驱节点
                    final Node p = node.predecessor();
                    // 若前驱节点为头节点且获取同步状态成功
                    if (p == head && tryAcquire(arg)) {
                        // 设置头节点
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 判断线程是否需要阻塞
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    从源码中可以发现只有当前节点的前驱节点是头节点才能尝试获取同步状态,其原因在于:
    ①.头节点是成功获取到同步状态的节点,而头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点
    ②.保持FIFO同步队列原则

    阻塞

    加入队列后,会自旋不断获取同步状态,但是自旋过程中需要判断当前线程是否需要阻塞

    
        if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
        
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 前驱节点等待状态
            int ws = pred.waitStatus;
            // 若前驱节点状态为SIGNAL,表明当前节点处于等待状态,返回true
            if (ws == Node.SIGNAL)
                return true;
            // 若前驱节点状态>0即取消状态,表明前驱节点已经等待超时或者被中断了,需要从同步队列中取消
            if (ws > 0) {
                // 循环遍历,直至处于当前节点前面的节点不为取消状态为止
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            // 前驱节点状态为CONDITION,PROPAGATE
            } else {
                // CAS设置前驱节点状态为SINNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    若shouldParkAfterFailedAcquire返回true,会调用parkAndCheckInterrupt方法,其方法内部主要调用LockSupport工具类的park()方法阻塞线程

    
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            // 返回当前线程的中断状态
            return Thread.interrupted();
        }
    

    acquire()方法流程

  • 独占式释放同步状态
  • 当线程获取同步状态后,执行完相应逻辑后就需要释放同步状态,AQS提供了release()方法释放同步状态,方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态),同样自定义同步器需要重写tryRelease()释放同步状态
    
        public final boolean release(int arg) {
            // 若释放同步状态成功
            if (tryRelease(arg)) {
                // 获取同步队列头节点
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    // 唤醒头节点的后继节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    private void unparkSuccessor(Node node) {
        // 获取当前节点等待状态
        int ws = node.waitStatus;
        // 若状态为SIGNAL、CONDITION或PROPAGATE,CAS将其状态置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // 获取后继节点
        Node s = node.next;
        // 若后继节点为null或其状态为CANCELLED(等待超市或者被中断)
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从尾结点遍历
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 唤醒节点所关联的线程
            LockSupport.unpark(s.thread);
    }
    

    从源码中可以发现唤醒的节点从尾遍历而不是从头遍历,原因是当前节点的后继可能为null、等待超时或被中断,所以从尾部向前进行遍

    共享式同步状态获取与释放

    共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状 态,例如ReentrantReadWriteLock中的读锁

  • 共享式获取同步状态
  • 
        public final void acquireShared(int arg) {
            // 若获取失败自旋再次尝试
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    首先tryAcquireShared()尝试获取同步状态,若返回值大于等于0时表明获取成功,否则调用doAcquireShared()自旋获取同步状态
    
        private void doAcquireShared(int arg) {
            // 将共享节点加入同步队列
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                // 中断标记
                boolean interrupted = false;
                // 死循环
                for (;;) {
                    // 获取前驱节点
                    final Node p = node.predecessor();
                    // 若前驱节点为头节点
                    if (p == head) {
                        // 获取同步
                        int r = tryAcquireShared(arg);
                        // 若获取成功
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    // 判断线程是否需要阻塞
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

  • 共享式释放同步状态
  • 
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    释放了同步状态之后,会唤醒后续处于等待状态的节点,同样自定义同步器需要重写tryRelease()释放同步状态。不过因为是共享,会存在多个线程同时释放同步状态,所以采用CAS,当CAS操作失败自旋循重试

    超时获取同步状态

    使用内置锁synchronized同步,可能会造成死锁,而AQS提供了超时获取同步状态,即在指定时间段内获取同步状态

  • 独占式超时获取同步状态
  • 相对于上面介绍的acquire()方法(此方法无法响应中断),AQS为了响应中断额外提供了acquireInterruptibly()方法,若当前线程被中断会立即响应中断抛出异常
    
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    
    该方法首先判断线程是否中断,若是抛出异常;否则执行tryAcquire()方法获取同步状态,获取成功直接结束否则执行doAcquireInterruptibly(),与acquireQueued()类似,最大区别在于其不再使用interrupted标志,直接抛出InterruptedException异常
    
        private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    tryAcquireNanos()方法超时获取同步状态是响应中断获取同步状态的"增强版",增加了超时控制
    
        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
        }
    
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        // 超时时间    
        final long deadline = System.nanoTime() + nanosTimeout;
        // 将独占节点
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            // 死循环自旋
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                // 若前驱节点为头节点且获取同步状态成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                // 若获取失败,判断是否超时
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                // 判断线程是否中断    
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    其思路:首先记录deadline超时时间获取同步状态,若获取失败判断是否超时,没有超时则计算剩余等待时间,若剩余时间小于等于0表明已经超时,若没有则判断是否大于spinForTimeoutThreshold(1000L),如果大于使用阻塞方式等待,否则仍然自旋等待,使用了LockSupport.parkNanos()方法来实现限时地等待,并支持中断

  • 共享式超时获取同步状态
  • 共享式获取响应中断doAcquireSharedInterruptibly()方法与共享式获取同步状态也类似,区别也是不再使用interrupted标志,直接抛出InterruptedException异常。共享式超时获取大体思路也差不多,不再多述。

    感谢

    《java并发编程的艺术》