阅读 280

并发编程第九天---公平、非公平、读写锁源码

ReentrantLock

ReentrantLock 是可重入的独占锁,是用 AQS 来实现的,以下是其类图结构。

ReentrantLock 有两种锁模式,公平模式和非公平模式,我们先来看看非公平是如何体现的。


非公平模式

    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
    }
复制代码

先尝试获取锁的线程并不一定比后尝试获取锁的线程优先获得锁,比如:线程 A 尝试获取锁,然后发现 state 不为 0,于是加入阻塞队列,然后线程 B 来了后,发现 state 为 0,于是占有锁。这里线程 B 在获取锁前并没有查看当前 AQS 队列里面是否有比自己更早请求该锁的线程,而是使用了抢夺策略。这就是非公平的体现。


公平模式

    protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
    }
复制代码

与非公平模式相比,在设置 CAS 前多了一个 hasQueuedPredecessors() 方法,该方法是实现公平性的核心代码。

    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
复制代码

只有返回 false 时,当前线程才能获取锁。

当前队列为空或者当前节点是头结点则返回 false,否则证明当前节点前有节点排队,返回 true

h!=t && (s=h.next)==null,说明有一个元素即将作为 AQS 的第一个节点入队 (enq() 函数的第一个元素入队列是两步操作: 首先建一个哨兵头节点,然后将第一个元素插入到哨兵节点后),所以返回 true

公平模式保证了先来的线程先获取锁


读写锁 ReentrantReadWriteLock

解决线程安全问题使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同一时刻只有一个线程可以获取该锁,而实际中会有读多写少的场景。

显然 ReentrantLock 满足不了这个需求,所以 ReentrantReadWriteLock 应运而生,它采用了读写分离的策略,允许多个线程可以同时获取读锁,同时间只有一个线程能获取写锁

读写锁的内部维护了一个 ReadLockWriteLock,他们都是通过 AQS 实现具体功能 ,也都提供了公平和非公平的实现,下面我们介绍非公平的读写锁实现。

AQS 内部只有一个 state 变量,如何同时维护读状态和写状态呢? state 的高16位表示读状态,低16位表示写状态

abstract static class Sync extends AbstractQueuedSynchronizer{
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        
        // 返回读锁的重入次数
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        
        // 返回写锁的重入次数
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
        
        // 记录第一个获取读锁的线程
        private transient Thread firstReader = null;
        
        // 记录第一个获取读锁的线程的的重入次数
        private transient int firstReaderHoldCount;
        
        static final class HoldCounter {
            int count = 0;
            // 记录线程 id
            final long tid = getThreadId(Thread.currentThread());
        }
        
        
        // ThreadLocal型变量
        static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
        
        // 用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数
        private transient ThreadLocalHoldCounter readHolds;
        
        // 记录最后一个获取读锁的线程的重入次数
        private transient HoldCounter cachedHoldCounter;
}
复制代码

获得读锁的重入数是 state 无符号右移 16 位,获取写锁的可重入次数是 state & 1...1( 16 个 1)readHoldsThreadLocal 型变量,用来记录线程(除去第一个获取读锁的线程)获取读锁的重入次数, firstReaderHoldCount 记录第一个获取读锁的线程的的重入次数,cachedHoldCounter 记录最后一个获取读锁的线程的重入次数。

我们先来看看读锁和写锁的一些特性,再看看他们是如何实现的

  • 有线程获取写锁时,其他线程不能获取读锁和写锁
  • 有线程获取读锁时,其他线程不能获得写锁,可以获取读锁
  • 写锁可变为读锁(锁降级)

写锁

写锁是用 ReentrantReadWriteLock.WriteLock 实现的。


写锁获取

ReentrantReadWriteLock 内部的 Sync 重写了 tryAcquire()

        protected final boolean tryAcquire(int acquires) {

            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c); // 写锁的重入次数
            
            // c!=0 说明读锁或者写锁已被某个线程获取
            if (c != 0) {
                // w == 0 表示有线程获取了读锁,返回 false ,w!=0 并且当前线程不是写锁拥有者,返回 false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            
            // 第一个线程获取写锁 
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }
复制代码

state!=0 时,说明读锁或者写锁已经被某个线程获取,当写锁的重入次数 w==0 时,表示有线程获取到了读锁,当前线程则获取写锁失败,如果 w!=0 并且当前线程是写锁的持有者,则更新 state 的值。

writerShouldBlock() 是公平和非公平的体现,在非公平模式下返回 false,公平模式下返回 hasQueuedPredecessors()

如果返回失败就调用 acquireQueued() 方法,和之前 ReentrantLock 流程一模一样。


写锁释放

        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 获取可重入值,不用考虑高16位
            int nextc = getState() - releases;  
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            // 更新 state
            setState(nextc);
            return free;
        }
复制代码

这里需要注意的是,获取可重入值时不需要要考虑高 16 位,因为获取写锁时,读锁状态值肯定为 0


总结

写锁 ReentrantReadWriteLock.WriteLockReentrantLock 都是独占锁模式,他们除了 tryAcquire() 方法和 tryRelease() 方法不同外,其他方法一模一样。

如果有别的线程获取到了读锁或者写锁,则当前线程获取写锁失败。,


读锁

读锁是用ReentrantReadWriteLock.ReadLock 实现的


获取读锁

    public void lock() {
        sync.acquireShared(1);
    }
    
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
复制代码

读锁与写锁不同的是,读锁采用的共享模式


tryAcquireShared()

    protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            // 写锁被占用,并且当前线程不是获取写锁的线程
            if (exclusiveCount(c) != 0 &&   
                getExclusiveOwnerThread() != current)
                return -1;
                
            // 获取读锁计数
            int r = sharedCount(c);
            
            // 尝试获取锁
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) { // CAS 设置读状态 + 1
                // 第一个线程获取读锁,初始化
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) { // 当前线程是第一个获取到读锁的线程
                    firstReaderHoldCount++;
                } else { // 说明已经有别的线程获取到了读锁
                    // 记录最后一个获取读锁的线程的可重入次数
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) // 当前线程是最后一个获取到读锁的线程
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            // 说明线程 CAS 失败或者 当前队列中第一个等待的线程是写线程
            return fullTryAcquireShared(current);
        }
复制代码

如果写锁被占用并且当前线程不是获取写锁的线程,则直接返回,如果当前线程是获取写锁的线程,则可实现锁降级,写锁变读锁。获取读锁失败的线程可能是因为 CAS 失败,也可能是因为避免写饥饿机制导致的,他们都会进入 fullTryAcquireShared()fullTryAcquireShared()tryAcquireShared() 十分类似,只不过前者通过循环自旋获取。

readShouldBlock() 是公平和非公平的体现,公平返回的是 hasQueuedPredecessors(),我们来看看非公平的实现。


避免写饥饿

    final boolean readerShouldBlock() {
        // 避免写饥饿
        return apparentlyFirstQueuedIsExclusive(); 
    }
    
    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }
复制代码

如果队列中的第一个排队的线程不为空,并且是独占模式的(写线程),则读锁获取失败(避免写饥饿)。


读锁释放

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
复制代码

tryReleaseShared()

        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            // 处理 firstReader 和 HoldCounter 计数
            if (firstReader == current) {
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    return nextc == 0; // 只有当 nextc==0 时,返回 true
            }
        }
复制代码

for 循环中,知道当更新后 AQS 状态值为 0,代表此时已经没有读线程占用读锁,才返回 true,然后调用doReleaseShared(),释放一个由于获取写锁而被阻塞的线程。否则就一直循环自旋。

doReleaseShared()源码就不粘贴了,很简单。


总结

读锁的节点是共享模式,另外 写锁 和 ReentrantLock 都能实现 Condition 队列,和 AQS 原理大致类似。


用读写锁实现缓存

public class MyCache {
    static HashMap<String,Object> map = new HashMap<>();
    static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    // 根据 key 获取对应的 value
    public Object get(String key){
        readLock.lock();
        try{
            return map.get(key);
        }finally {
            readLock.unlock();
        }
    }

    public static void put(String key, Object value){
        writeLock.lock();
        try {
            map.put(key,value);
        }finally {
            writeLock.unlock();
        }
    }

    public static void clear(){
        writeLock.lock();
        try {
            map.clear();
        }finally {
            writeLock.unlock();
        }
    }
}
复制代码

使用读写锁保证了 map 的线程安全,在读操作 get() 方法中,获取读锁,这使 得并发访问该方法时不会被阻塞,既能保证可见性,也提升了效率。

关注下面的标签,发现更多相似文章
评论