Java并发-AQS

212 阅读8分钟

LockSupport

作用:唤醒和阻塞线程,用来构建同步工具

  • 以park开头的方法为阻塞线程
 public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
  • unpack(Thread thread)唤醒线程
public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

AbstractQueuedSynchronizer

AQS使用的设计模式

模板方法(Template Method)

image-20200417091941574

  • **AblstractClass(抽象类):**在抽象类中定义了一系列的操作PrimitiveOperation,每个操作可以使具体的,也可以是抽象的,每个操作对应一个算法的步骤,在子类中可以重新定义或实现这些步骤。TmplateMethod()这个方法用于定义一个算法结构,模板方法不仅可以调用在抽象类中实现的基本方法,也可以调用在抽象类的子类中实现的基本方法,还可以调用其他对象中的方法。
  • **ConcreteClass(具体子类):**用于实现在父类中声明的抽象基本操作,也可以覆盖在父类中已经实现的具体基本操作。

下面我们可以手写一个模板方法的设计模式

定义一个抽象类

public abstract class SendCustom {
    public abstract void from();

    public abstract void to();

    public abstract void content();

    public abstract void send();


    public void date() {
        System.out.println(new Date());
    }

    public void sendMessage() {
        from();
        to();
        content();
        date();
        send();
    }


}

定义一个继承类

public class SendSms extends SendCustom {
    @Override
    public void from() {
        System.out.println("from");
    }

    @Override
    public void to() {
        System.out.println("to");
    }

    @Override
    public void content() {
        System.out.println("content");
    }

    @Override
    public void send() {
        System.out.println("send");
    }

    public static void main(String[] args) {
        SendCustom custom = new SendSms();
        custom.sendMessage();
    }
}

输出如下

from
to
content
Fri Apr 17 09:28:07 CST 2020
send
AQS中的方法

模板方法

  • 独占式获取

    • aquire
    • aquireInterruptly
    • tryAquireNanos
  • 共享式获取

    • aquireShared
    • aquireSharedInterruptly
    • tryAquireSharedNanos
  • 独占式释放

    • release
  • 共享式释放

    • releaseShared

需要覆盖的流程方法

  • 独占式获取:tryAcquire
  • 独占式释放:tryRelease
  • 共享式获取:tryAcquireShared
  • 共享式释放:tryReleaseShared
  • 这个同步器是否处于独占模式:isHeldExlusively

同步状态:state

  • 获取:getState
  • 设置:
    • setState
    • compareAndSetState,使用CAS保证原子性
设置一个类似ReentrantLock的类
public class SelfLock implements Lock {

    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean isHeldExclusively() {
            return 1 == getState();
        }

        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (0 == getState()) {
                throw new UnsupportedOperationException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        Condition newCond() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCond();
    }
}

编写一个测试类

public class TestMyLock {
    public void test() {
        final Lock lock = new SelfLock();
        class Worker extends Thread {
            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        SleepTools.second(1);
                        System.out.println(Thread.currentThread().getName());
                        SleepTools.second(1);

                    } finally {
                        lock.unlock();
                    }
                    SleepTools.second(2);
                }
            }
        }
        for (int i = 0; i < 10; i++) {
            Worker worker = new Worker();
            worker.setDaemon(true);
            worker.start();

        }

        for (int i = 0; i < 10; i++) {
            SleepTools.second(1);
            System.out.println();
        }
    }

    public static void main(String[] args) {
        TestMyLock testMyLock = new TestMyLock();
        testMyLock.test();
    }
}

输出如下

Thread-0



Thread-1


Thread-2


Thread-3

Thread-4
AQS中的数据结构-节点和同步队列

image-20200417102043920

如上图所示,AQS是一个双向的链表,每个节点由「前驱」,「后继」指针,还有每个节点包括了一个线程的信息,,采用的是「先进先出」的方式。

竞争失败的线程会打包成Node放到同步队列,Node可能的状态里:

  • CANCELLED:线程等待超时或者被中断了,需要从队列中移走

  • SIGNAL:后续的节点等待状态,当前节点,通知后面的节点去运行

  • CONDITION :当前节点处于等待队列

  • PROPAGATE:共享,表示状态要往后面的节点传播

  • 0, 表示初始状态

节点加入队列

image-20200417102118684

队列释放首节点

image-20200417102125918

以下为独占式同步状态的获取与释放

image-20200417103034724

结合源码如下

获取锁

//1 获取锁
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
//2获取失败生成节点,并加入队列
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
  			//若前驱不为空 CAS加入对列,并返回
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
  			//若前驱为空 ,加入对类返回节点
        enq(node);
        return node;
}
//加入队列,返回节点
//自旋检查,先判断是否存在头节点,若存在加入队列返回节点,若不存在,初始化头节点
 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
//3.自旋判断前驱节点并且能获取到锁,若都能将此节点设为头节点,让后将此节点的后继脱落队列,返回中断为false
//若不能,看第4步,
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
//4.获取节点的状态,并且修改节点状态,然后阻塞线程,返回中断为true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

释放锁

//能否释放线程,若能判断头节点不为空且头节点状态不为0,就唤醒头节点
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
共享式同步状态获取与释放

相比于独占式多了一个setHeadAndPropagate(node, r);的方法,作用是偏移,类似于SemaPhore,若共享线程有10个

		/**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
独占式超时同步状态获取

即在独占模式上面增加了一个等待超时模式

/**
 * Attempts to acquire in exclusive mode, aborting if interrupted,
 * and failing if the given timeout elapses.  Implemented by first
 * checking interrupt status, then invoking at least once {@link
 * #tryAcquire}, returning on success.  Otherwise, the thread is
 * queued, possibly repeatedly blocking and unblocking, invoking
 * {@link #tryAcquire} until success or the thread is interrupted
 * or the timeout elapses.  This method can be used to implement
 * method {@link Lock#tryLock(long, TimeUnit)}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @param nanosTimeout the maximum number of nanoseconds to wait
 * @return {@code true} if acquired; {@code false} if timed out
 * @throws InterruptedException if the current thread is interrupted
 */
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
/**
     * Acquires in exclusive timed mode.
     *
     * @param arg the acquire argument
     * @param nanosTimeout max wait time
     * @return {@code true} if acquired
     */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
实现一个奇葩点的三元共享同步工具类
public class TrinityLck  implements Lock {
    
    private Sync sync = new Sync(3);

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryReleaseShared(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCond();
    }

    private static class Sync extends AbstractQueuedSynchronizer{


        Sync(int count){
            if(count<=0){
                throw new IllegalArgumentException("count must large than zero.");
            }
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int reduceCount) {
            for (;;){
                int current = getState();
                int newCount  = current-reduceCount;
                if(newCount < 0 || compareAndSetState(current, newCount)){
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int returnCount) {
            for (;;) {
                int current = getState();
                int newCount = current + returnCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }
        
        Condition newCond(){
            return new ConditionObject();
        }
    }
}
Condition

一个Condition包含一个等待队列,是一个单项列表

image-20200417112218868

同步队列与等待队列

一个AQS对应了多个Condition,所以signal不多唤醒其他Condition的阻塞线程

image-20200417112232898

await方法

当线程获取到锁的时候,等待Condition,若条件未满足,则吧该节点构造新的Condition节点加入Condition

image-20200417112305804

signal方法

当线程获取到锁的时候,调用了一个Condition的singal,则吧该Condition队列中的一个节点放入AQS队列等待执行。

image-20200417112316150

锁的可重入

我们刚刚写了一个独占式的锁,里面的额state就只只有0,1这样在写递归的时候,若线程a获得锁为释放的时候又要获取锁,理论上就会等待,而线程a必须在第二次获取到锁的时候才能继续执行,这样就死锁了,所以我嗯采取的是在state上面进行累加,而不是直接改变状态,进而记录获取锁的次数,若释放一次就减一次,类似于SemaPore。

非公平锁的实现

我们的公平锁获取锁的第一步是队列是否有尾节点,若存在,继续执行,而非公平锁则是第一步就直接去拿锁,拿不到在做其他的操作,这样非公平锁的效率跟高

ReentrantWriteReadLock的实现

在ReentrantWriteLReadLock中定义了两把锁,但是只有一个Sync对象,那么他的State是怎么记录的呢

image-20200417115657315