AQS队列同步器

231 阅读3分钟

AQS队列同步器

lockdsaads.png 先了解几个状态

/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

//从队列中移除
static final int CANCELLED =  1;
//其后继结点等待唤醒,SINGAL是挂起状态
static final int SIGNAL    = -1;
//条件唤醒
static final int CONDITION = -2;
/**
*waitStatus值,指示下一个acquireShared应该
*无条件传播
*/
static final int PROPAGATE = -3;

1.立即获取的方式 tryAcquire

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

必须重写这个方法,模板模式

2.愿意等待的方式 Acquire

/**

*以独占模式获取,忽略中断。实施

*通过至少调用一次{@link#tryAcquire},

*成功的回报。否则线程可能会排队

*反复阻塞和取消阻塞,调用{@link

*#努力获取}直到成功。可以使用这种方法

*实现方法{@link Lock#Lock}。

*

*@param arg获取参数。此值传递给

*{@link#tryAcquire}但在其他方面没有解释,而且

*可以代表你喜欢的任何东西。

*/
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

Acquire

1.首先进行tryAcquire(arg);

如果tryAcquire()为true就跳出if判断,如果tryAcquire()为false就继续执行逻辑

2.addWaiter();

从尾部结点快速入队

static final Node EXCLUSIVE = null;
/**

*为当前线程和给定模式创建节点并将其排队。

*

*@param模式节点.独占为独家,节点.共享用于共享

*@返回新节点

*/
private Node addWaiter(Node mode) {
	/**用于指示节点正在独占模式下等待的标记*/
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

这一步的疑问?

并发的情况下,假如尾结点同时指向了两个结点,必然会有一个因为cmpxchg操作失败从而进入完整入队

if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }

尾部快速入队失败、进入完整入队方法

重复的判空可能存在性能问题,先快速入队再完整入队

//封装为一个Node,循环cmpxchg操作,最终必然会假如到队列当中去
 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

AcquireQueued(addWaiter(Node.EXCLUSIVE), arg))

Acquire中会对结点的状态进行判断,决定其需要唤醒还是挂起。

acquireQueued

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获得前置结点
                final Node p = node.predecessor();
                //如果前置结点为head,尝试得到锁
                if (p == head && tryAcquire(arg)) {
                    //把自己设置为头节点
                    setHead(node);
                    //help GC
                    p.next = null; // help GC
                    //没有失败
                    failed = false;
                    return interrupted;
                }
                //p不是头节点,进入逻辑判断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
             
             //if (true && parkAndCheckInterrupt())
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                //>0 就是Cancel状态,把cancel的结点删掉
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
            
            //if (false && parkAndCheckInterrupt())
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             //需要处理前置结点的状态
             //-2 -3的状态改为-1
             //更改为SINGAL状态并且返回true
             
             //if (false && parkAndCheckInterrupt())
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt

调用support原语挂起线程

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

Release

release

public final boolean release(int arg) {
			//没有tryRelease会抛出异常
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

从后面找到第一个singal的结点,唤醒

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

头结点始终停留在这里等待拿锁 image.png

阻塞停留在这里 image.png 线程挂起park的时候停留在这个状态 image.png