并发编程第八天--------大厂必问之 AQS 源码解析

1,207

LockSupport 工具类

LockSupport 是个工具类,主要作用是挂起和唤醒线程,该工具类是创建锁和其他同步类的基础

LockSupport 类与每个使用他的线程都会关联一个许可证,在默认情况下调用 LockSupport 类方法的线程是不持有许可证的LockSupport 使用 Unsafe 类实现的。


park() 方法

如果调用 park 方法的线程已经拿到了与 LockSupport 关联的许可证,则调用Locksupport. park()时会马上返回,否则调用线程会被阻塞挂起

public static void main(String[] args) {
    System.out.println("beign park");
    
    LockSupport.park();
    
    System.out.println("end park");
}

这个例子中,主线程打印出 begin park 后就会被阻塞挂起,因为默认情况下调用线程不持有许可证。如何才能唤醒因为调用 LockSupport.park() 被阻塞的线程呢,有下面两种方法:

  • 其他线程调用 LockSupport.unpark(Thread t) ,以被阻塞的线程作为参数,那么被阻塞的线程会被唤醒。
  • 其他线程调用了被阻塞线程的 interrupt() ,被则塞的线程会被唤醒。
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(()->{
            System.out.println("t1 beign park");

            LockSupport.park();

            System.out.println("t1 end park");
        });

        t1.start();

        Thread.sleep(2000);
        
        // t1 被唤醒
        LockSupport.unpark(t1);  // 用这个也行 t1.interrupt();
    }

unpark()

当一个线程调用 unpakr() 时,如果线程没有持有与 LockSupport 关联的许可证,则会让线程持有许可证

public static void main(String[] args) throws InterruptedException {
    // 让主线程持有许可证
    LockSupport.unpark(Thread.currentThread());

    System.out.println("beign park");

    LockSupport.park();
    
    // LockSupport.park();  (1)
    
    System.out.println("end park");
}

此时 begin parkend park 都能被输出。

park() 方法不具有可重入性,将上面的代码 (1) 处注释打开,就会发现线程仍然会被阻塞。


Condition 接口

任意一个 Java 对象,都有一组监视器方法(定义在 java.lang.Object 上),主要包括 wait()notify()notifyAll(),这些方法与 synchronized 关键字配合,实现等待通知模式。

Condition 接口也提供了类似 Object 的监视器方法,Lock 配合可以实现等待通知模式await() 类似 wait()signal() 类似 notify()signalAll() 类似 notifyAll()

他们之间最主要的区别是 synchronized 监视器锁只能实现一个等待队列,而 lock 能实现多个等待队列 (一个 lock 对象可以创建出多个不同的 Condition 实例)。


使用

使用方法类似 Object 中的 wait()notify()

    public static void main(String[] args) throws InterruptedException {
        Lock lock = new ReentrantLock();

        Condition condition = lock.newCondition();
        
        Thread t1 = new Thread(()->{
            lock.lock();
            System.out.println("t1 begin await");
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("t1 end await");
            lock.unlock();
        });

        t1.start();

        Thread.sleep(2000);

        System.out.println("begin signal");

        lock.lock();
        condition.signal();
        lock.unlock();
    }

Condition 对象是由 Lock 对象创建的。await()notify() 在使用前要先获取到锁。


AbstractQueuedSynchronizer

抽象同步队列 AbstractQueuedSynchronizer 简称 AQS 是用来构建锁或者其他同步组件的基础框架,通过内置的 FIFO 队列来完成同步状态的管理。以下是 AQS 的结构图。


public abstract class AbstractQueuedSynchronizer extends
    AbstractOwnableSynchronizer implements java.io.Serializable { 
    
    // 队列的头节点
    private transient volatile Node head;
    
    // 队列的尾节点
    private transient volatile Node tail;
    
    //同步状态
    private volatile int state;
    
    protected final int getState() { return state;}
    
    protected final void setState(int newState) { state = newState;}
    
    ...
}

AQS 内部维护了一个状态变量 state,对于 ReentrantLock 的实现,state 表示当前获取锁的线程的重入次数,对于读写锁来 ReentrantReadWriteLock 或者 CountDonwLatch 来说, state 又表示不同的含义,后面再详解。

对于 AQS 来说,线程同步的关键就是对状态值 state 进行操作,根据 state 的值判断锁是否被线程占用

同步器提供了 getState()setState()compareAndSetState() 三个方法来操作 state 变量。

exclusiveOwnerThread 记录了当前持有资源的线程。

AQS 是一个双向队列,内部通过 headtail 记录队首和队尾元素。

队列元素的类型是 Node。我们来看看 Node 节点的具体信息。


Node 节点

当前线程获取同步状态失败时,同步器会将当前线程以及等待信息构造成一个 Node 节点并将其加入同步队列。

static final class Node {
        
        // 表示当前节点在 共享模式下 等待的标志
        static final Node SHARED = new Node();
        // 表示当前节点在 独占模式下 等待的标志
        static final Node EXCLUSIVE = null;

        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;

        // 等待状态,值为上面的四个常量值
        volatile int waitStatus;

        // 前驱节点
        volatile Node prev;

        // 后继节点
        volatile Node next;

        // 节点内的线程
        volatile Thread thread;
    }

每个节点除了存储当前线程和前后节点的引用外,还有一个 waitStatus,表示节点中线程的等待状态。

Node 节点有线程四种等待状态

  • CANCELLED,表示同步队列中等待的线程因为某种原因(比如中断、超时)而取消等待
  • SIGNAL,指示后续节点的线程需要 unpark()只有前驱节点的线程状态为 SIGNAL 时,当前节点的线程才能被阻塞
  • CONDITION : 指示线程正在条件队列中等待,用于条件队列
  • PROPAGATE : 不是很常用,可以不用知道

下面我们通过 ReentrantLock 的实现来深入分析入队和出队过程。


ReentrantLock

ReentrantLock 内部有一个 Sync 抽象类继承自 AbstractQueuedSynchronizer

abstract static class Sync extends AbstractQueuedSynchronizer {
        
        // 由 Sync 的子类实现
        abstract void lock();

        final boolean nonfairTryAcquire(int acquires) {
            // 具体实现....
        }

        protected final boolean tryRelease(int releases) {
            // 具体实现....
        }
        ...
    }

ReentranLock 锁有两种模式,公平模式和非公平模式,分别在 Sync 的两个子类 FairSyncNonfairSync 中实现。

    private final Sync sync;
    
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

sync 变量的默认实现是非公平方式。我们下面分析的源码是基于非公平模式。


lock()

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

一个线程获取锁时,先尝试 CAS 设置 state,如果设置成功,则设置当前线程为锁的持有者,设置失败则尝试调用 acquire()


nonfairTryAcquire()

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&  // 获取失败则调用 addWaiter()
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 非公平模式尝试获取,获取失败返回 false
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { // 再次尝试 CAS 设置 state 
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;  // 获取成功
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;  
    }
    return false; // 获取成功
}

如果 state 为 0,则直接 尝试 CAS 设置 state 并占有锁,或者当前线程是锁的持有线程,state 则 +1 (ReentrantLock 可重入的体现),否则获取失败,返回 false

获取失败后会调用 addWaiter()


addWaiter()

构造 Node 节点并入队。


// 返回新添加的节点
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); // 构造 Node 节点
   
    Node pred = tail; 
    if (pred != null) {
        node.prev = pred;
        // 尝试将尾节点指向新节点,如果失败则调用 enq() 方法
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}


private Node enq(final Node node) {
    for (;;) { // 无限循环
        Node t = tail;
        if (t == null) {
            // 初始化队列
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 将节点添加到队列中
            node.prev = t;
            // CAS 设置尾节点为添加的节点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

初始化队列在 enq() 进行,将 headtail 同时指向一个空引用,队列初始化完成。

在节点入队的过程中,通过 CAS 将尾节点指向新添加的节点,保证了线程安全。

节点入队后,并不会立即阻塞挂起 ,而是会调用 acquireQueued(node,args) 方法,再次尝试获取锁资源,因为在节点入队的过程中,之前的线程可能执行完成。


acquireQueued()

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {  // 无限循环
                final Node p = node.predecessor(); // 当前节点的前驱节点
                if (p == head && tryAcquire(arg)) { // 如果 p 是头节点 并且 当前线程尝试获取锁成功
                    setHead(node);   // 头结点指针指向当前节点,并清空节点 thread 属性
                    p.next = null; // GC 回收
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

如果当前节点的前驱节点是头结点 并且 当前线程再次尝试获取锁成功后,则将 head 引用执行当前节点,并清空当前节点的 threadprev 信息,然后返回 false

否则执行 shouldParkAfterFailedAcquire(p,node),判断当前节点是否应该被则塞,如果返回 true 代表当前节点应该阻塞。


shouldParkAfterFailedAcquire()

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            
            // 只有前驱节点是 siganl 时,节点才能被则塞
            return true;
        if (ws > 0) {
            // 删除前驱节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 设置前驱节点的 waitStatus 为 -1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false; // 返回 false
    }

只有当前面节点的线程状态为 SIGNAL 时,当前节点的线程才能被则塞挂起。

  • 如果 pred.waitStatus==0,则通过 CAS 修改其为 Node.SIGNAL
  • 如果 pred.waitStatus>0,则从队列中删除 pred 节点
  • 如果 pred.waitStatus==-1,则返回 true,表示当前节点应该被阻塞,然后调用parkAndCheckInterrupt() 方法阻塞线程
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 阻塞
        return Thread.interrupted(); // 中断检测机制
    }

acquireQueued() 中可以看出,线程被唤醒后不一定能获取锁,还得再次竞争,这就是非公平的体现。


流程图总结


unlock()

我们来看看解锁的过程

    // 释放锁
    public void unlock() {
        sync.release(1);
    }

release()

    public final boolean release(int arg) {
        if (tryRelease(arg)) { // 释放成功
            Node h = head;
            if (h != null && h.waitStatus != 0)  // 如果头节点不为空并且头节点的 waitstatus 不为 0
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases; // 如果 c=0,则代表 state 为 1
        if (Thread.currentThread() != getExclusiveOwnerThread()) // 判断当前线程是否是持有锁的线程
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;   
            setExclusiveOwnerThread(null); // 释放锁资源
        }
        setState(c); // 设置 state
        return free;
    }

释放锁成功后会调用 unparkSuccessor(head)


unparkSuccessor(head)

    private void unparkSuccessor(Node node) {
        
        
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0); // CAS 设置 waitStatus

        
        
        Node s = node.next;  // 下一个节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);  // 唤醒下一个节点
    }

如果头节点的 waitStatus<0 ,通过 CAS 设置其为 0 。然后唤醒下一个节点


流程图总结