AQS 队列同步器

476 阅读2分钟

1. AQS是什么

AQSAbstractQueuedSynchronizer的简称,是用来构建锁或者其他同步组件的基础框架,它使用int成员变量表示同步状态,通过内置的FIFO(First in first out)队列来完成资源获取线程的排队工作。

ReetrantLock、ReetrantReadWriteLock、Semaphore内部均有Sync抽象静态内部类同步实现,Sync子类实现了公平与不公平版本。

2. AQS与锁的关系?

AQS是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:

  1. 锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节
  2. 同步器是面向锁的实现者,它简化了所得实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作
  3. 锁和同步器很好地隔离了使用者和是闲着所需关注的领域

3. 怎么使用?

定义同步组件,内部通过静态内部类Sync实现AQS同步器,代码如下:

package com.alwyn.nettysample.synchronizer;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Mutex implements Lock {

    private final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        @Override
        public boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        Condition newCondition() {return new ConditionObject();}
    }

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }
}

通过springboot快速搭建工程,其中controller代码如下:

package com.alwyn.nettysample.controller;

import com.alwyn.nettysample.synchronizer.Mutex;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@RestController
public class HelloController {
    private int count = 0;
    private AtomicInteger integer = new AtomicInteger(0);
    Mutex lock = new Mutex();

    @RequestMapping("/hi")
    public String hi() {
        try {
            int i = integer.incrementAndGet();
            System.out.println("request count:" + i);
            boolean flag = lock.tryLock(100, TimeUnit.MILLISECONDS);
            if (flag) {
                count++;
                System.out.println("count = " + count);
                Thread.sleep(10);
                lock.unlock();
            } else {
                boolean b = lock.hasQueuedThreads();
                Collection<Thread> queuedThreads = lock.getQueuedThreads();
                for (Thread thread : queuedThreads) {
                    System.out.println(thread.getId() + "::" + thread.getName() + "::" + thread.getState() + "::" + thread.getThreadGroup());
                }
                System.out.println("获取锁失败:" + b);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


        return "say hi world";
    }

    @RequestMapping("/reset")
    public void setValue() {
        integer.set(0);
        count = 0;
    }
}

通过Jmeter设置并发请求锁

结果: