Java并发(7)- 你真的了解 ReentrantReadWriteLock 吗?

11,352 阅读13分钟

引言

在前几篇文章中了解了ReentrantLock、Semaphore与CountDownLatch后,J.U.C包中基于AQS实现的并发工具类还剩一个比较重要的:读写锁ReentrantReadWriteLock。读写锁在Java面试过程中是一个经常性考的题目,他涉及到的知识点比较多,导致很多人不能透彻的理解他。举几个面试中常见的问题:

  • ReentrantReadWriteLock和ReentrantLock的区别?
  • 你能用ReentrantReadWriteLock实现一个简单的缓存管理吗?
  • 你能自己实现一个简单的读写锁吗?
  • ReentrantReadWriteLock会发生写饥饿的情况吗?如果发生,有没有比较好的解决办法?

上面的问题大家都能回答出来吗?如果都能很好的回答,那么这篇文章也许对你没有太大帮助。如果不能,别着急,下面就来一一分析解答上面的几个问题。

1. ReentrantReadWriteLock和ReentrantLock的区别?

这个问题大家应该都会回答:ReentrantLock是独占锁,ReentrantReadWriteLock是读写锁。那么这就引申出下一个问题:ReentrantReadWriteLock中的读锁和写锁分别是独占还是共享的?他们之间的关系又是什么样的?要了解透彻这两个问题,分析源码是再好不过的了。

1.1 ReentrantReadWriteLock中的读锁和写锁的实现方式

使用ReentrantReadWriteLock读写锁的方式,会调用readLock()和writeLock()两个方法,看下他们的源码:

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

可以看到用到了WriteLock和ReadLock两个静态内部类,他们对锁的实现如下:

public static class ReadLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.acquireShared(1); //共享
    }

    public void unlock() {
        sync.releaseShared(1); //共享
    }
}

public static class WriteLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.acquire(1); //独占
    }

    public void unlock() {
        sync.release(1); //独占
    }
}

abstract static class Sync extends AbstractQueuedSynchronizer {}

看到这里发现了ReentrantReadWriteLock和ReentrantLock的一个相同点和不同点,相同的是使用了同一个关键实现AbstractQueuedSynchronizer,不同的是ReentrantReadWriteLock使用了两个锁分别实现了AQS,而且WriteLock和ReentrantLock一样,使用了独占锁。而ReadLock和Semaphore一样,使用了共享锁。再往下的内容估计看过前面几篇文章的都很熟悉了,独占锁通过state变量的0和1两个状态来控制是否有线程占有锁,共享锁通过state变量0或者非0来控制多个线程访问。在上面的代码中,ReadLock和WriteLock使用了同一个AQS,那么在ReentrantReadWriteLock中又是怎么控制读锁和写锁关系的呢?

1.2 ReadLock和WriteLock共享变量

读写锁定义为:一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程。

通过这句话很容易联想到通过两个不同的变量来控制读写,当获取到读锁时对读变量+1,当获取懂啊写变量时对写变量+1。但AQS中并没有为ReadLock和WriteLock添加额外的变量,还是通过一个state来实现的。那是怎么做到读写分离的呢?来看看下面这段代码:1。但AQS中并没有为ReadLock和WriteLock添加额外的变量,还是通过一个state来实现的。那是怎么做到读写分离的呢?来看看下面这段代码:

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

这段代码在Sync静态内部类中,这里有两个关键方法sharedCount和exclusiveCount,通过名字可以看出sharedCount是共享锁的数量,exclusiveCount是独占锁的数量。共享锁通过对c像右位移16位获得,独占锁通过和16位的1与运算获得。举个例子,当获取读锁的线程有3个,写锁的线程有1个(当然这是不可能同时有的),state就表示为0000 0000 0000 0011 0000 0000 0000 0001,高16位代表读锁,通过向右位移16位(c >>> SHARED_SHIFT)得倒10进制的3,通过和0000 0000 0000 0000 1111 1111 1111 1111与运算(c & EXCLUSIVE_MASK),获得10进制的1。弄懂了着几个方法,就明白了为什么通过一个state实现了读写共享。

这当中还有一个问题,由于16位最大全1表示为65535,所以读锁和写锁最多可以获取65535个。

1.3 WriteLock和ReentrantLock获取锁的区别

上面说过,WriteLock也是独占锁,那么他和ReentrantLock有什么区别呢?最大的区别就在获取锁时WriteLock不仅需要考虑是否有其他写锁占用,同时还要考虑是否有其他读锁,而ReentrantLock只需要考虑自身是否被占用就行了。来看下WriteLock获取锁的源代码:

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) && //尝试获取独占锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //获取失败后排队
        selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {

    Thread current = Thread.currentThread();
    int c = getState();  //获取共享变量state
    int w = exclusiveCount(c); //获取写锁数量
    if (c != 0) { //有读锁或者写锁
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread()) //写锁为0(证明有读锁),或者持有写锁的线程不为当前线程
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);  //当前线程持有写锁,为重入锁,+acquires即可
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires)) //CAS操作失败,多线程情况下被抢占,获取锁失败。CAS成功则获取锁成功
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

这段代码是不是很熟悉?和ReentrantLock中获取锁的代码很相似,差别在于其中调用了exclusiveCount方法来获取是否存在写锁,然后通过c != 0和w == 0判断了是否存在读锁。acquireQueued和addWaiter就不详细解说了,需要了解的可以查看前面ReentrantLock的文章。 到这里大家应该对ReentrantReadWriteLock和ReentrantLock的区别应该做到心里有数了吧。

1.4 ReadLock和Semaphore获取锁的区别

WriteLock是独占模式,我们比较了它和ReentrantLock独占锁获取锁的区别,这里我们再看看ReadLock在获取锁上有什么不同呢?先看下面的源代码:

protected final int tryAcquireShared(int unused) {

	Thread current = Thread.currentThread();
	int c = getState();
	if (exclusiveCount(c) != 0 &&
		getExclusiveOwnerThread() != current) //写锁不等于0的情况下,验证是否是当前写锁尝试获取读锁
		return -1;
	int r = sharedCount(c);  //获取读锁数量
	if (!readerShouldBlock() && //读锁不需要阻塞
		r < MAX_COUNT &&  //读锁小于最大读锁数量
		compareAndSetState(c, c + SHARED_UNIT)) { //CAS操作尝试设置获取读锁 也就是高位加1
		if (r == 0) {  //当前线程第一个并且第一次获取读锁,
			firstReader = current;
			firstReaderHoldCount = 1;
		} else if (firstReader == current) { //当前线程是第一次获取读锁的线程
			firstReaderHoldCount++;
		} else { // 当前线程不是第一个获取读锁的线程,放入线程本地变量
			HoldCounter rh = cachedHoldCounter;
			if (rh == null || rh.tid != getThreadId(current))
				cachedHoldCounter = rh = readHolds.get();
			else if (rh.count == 0)
				readHolds.set(rh);
			rh.count++;
		}
		return 1;
	}
	return fullTryAcquireShared(current);
}ß

在上面的代码中尝试获取读锁的过程和获取写锁的过程也很相似,不同在于读锁只要没有写锁占用并且不超过最大获取数量都可以尝试获取读锁,而写锁不仅需要考虑读锁是否占用,也要考虑写锁是否占用。上面的代码中firstReader,firstReaderHoldCount以及cachedHoldCounter都是为readHolds(ThreadLocalHoldCounter)服务的,用来记录每个读锁获取线程的获取次数,方便获取当前线程持有锁的次数信息。在ThreadLocal基础上添加了一个Int变量来统计次数,可以通过他们的实现来理解:

static final class ThreadLocalHoldCounter
	extends ThreadLocal<HoldCounter> { //ThreadLocal变量ß
	public HoldCounter initialValue() {
		return new HoldCounter();
	}
}

static final class HoldCounter {
	int count = 0;           //当前线程持有锁的次数
	// Use id, not reference, to avoid garbage retention
	final long tid = getThreadId(Thread.currentThread());   //当前线程ID
}

2. 你能用ReentrantReadWriteLock实现一个简单的缓存吗?

先来分析一个简单缓存需要满足的功能,这里我们为了实现简单,不考虑缓存过期策略等复杂因素。

  • 缓存主要提供两个功能:读和写。
  • 读时如果缓存中存在数据,则立即返回数据。
  • 读时如果缓存中不存在数据,则需要从其他途径获取数据,同时写入缓存。
  • 在写入缓存的同时,为了避免其他线程同时获取这个缓存中不存在的数据,需要阻塞其他读线程。 下面我们就来通过ReentrantReadWriteLock实现上述功能:
public static void ReentrantReadWriteLockCacheSystem() {

	//这里为了实现简单,将缓存大小设置为4。
	Map<String, String> cacheMap = new HashMap<>(4); 
	ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

	for (int i = 0; i < 20; i++) { //同时开启20个线程访问缓存

		final String key = String.valueOf(i % 4); 
		Thread thread = new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					//①读取缓存时获取读锁
					readWriteLock.readLock().lock();  
					//获取读锁后通过key获取缓存中的值
					String valueStr = cacheMap.get(key); 
					//缓存值不存在
					if (valueStr == null) { 
						//③释放读锁后再尝试获取写锁
						readWriteLock.readLock().unlock();  
						try {
							//④获取写锁来写入不存在的key值,
							readWriteLock.writeLock().lock();  
							valueStr = cacheMap.get(key);
							if (valueStr == null) {
								valueStr = key + " --- value";
								cacheMap.put(key, valueStr); //写入值
								System.out.println(Thread.currentThread().getName() + " --------- put " + valueStr);
							}
							// ⑥锁降级,避免被其他写线程抢占后再次更新值,保证这一次操作的原子性
							readWriteLock.readLock().lock(); 
							System.out.println(Thread.currentThread().getName() + " --------- get new " + valueStr);
						} finally {
							readWriteLock.writeLock().unlock(); //⑤释放写锁
						}
						
					} else {
						System.out.println(Thread.currentThread().getName() + " ------ get cache value");
					}
				} finally {
					readWriteLock.readLock().unlock();  //②释放读锁
				}
			}
		}, String.valueOf(i));
		thread.start();
	}
}

首先线程会尝试去获取数据,需要获取读锁①,如果存在值,则直接读取并释放读锁②。如果不存在值,则首先释放已经获取的读锁③,然后尝试获取写锁④。获取到写锁之后,再次检查值,因为此时可能存在其他写锁已经更新值,这时只需要读取,然后释放写锁⑤。如果还是没有值,则通过其他途径获取值并更新然后获取读锁⑥,这一步锁降级操作是为了直接抢占读锁,避免释放写锁之后再次获取读锁时被其他写线程抢占,这样保证了这一次读取数据的原子性。之后再执行⑤释放写锁和②释放读锁。

执行后输出结果如下,每次执行可能输出不同:

//1 --------- put 1 --- value
//1 --------- get new 1 --- value
//0 --------- put 0 --- value
//0 --------- get new 0 --- value
//9 ------ get cache value
//4 ------ get cache value
//2 --------- put 2 --- value
//2 --------- get new 2 --- value
//11 --------- put 3 --- value
//11 --------- get new 3 --- value
//5 ------ get cache value
//13 ------ get cache value
//6 ------ get cache value
//8 ------ get cache value
//7 ------ get cache value
//3 --------- get new 3 --- value
//10 ------ get cache value
//12 ------ get cache value
//14 ------ get cache value
//15 ------ get cache value
//16 ------ get cache value
//17 ------ get cache value
//18 ------ get cache value
//19 ------ get cache value

3. 你能自己实现一个简单的读写锁吗?

经过上面对读写锁原理的初步分析和使用,现在你能自己实现一个简单的读写锁了吗?这里列出了一步步实现一个简单的读写锁的过程,你可以按这个步骤自己先实现一遍。

  • 1 定义一个读写锁共享变量state
  • 2 state高16位为读锁数量,低16位为写锁数量。尽量模拟ReentrantReadWriteLock的实现
  • 3 获取读锁时先判断是否有写锁,有则等待,没有则将读锁数量加1
  • 4 释放读锁数量减1,通知所有等待线程
  • 5 获取写锁时需要判断读锁和写锁是否都存在,有则等待,没有则将写锁数量加1
  • 6 释放写锁数量减1,通知所有等待线程 我给出的实现代码如下:
public class MyReadWriteLock {

	private int state = 0; //1. 定义一个读写锁共享变量state
	
	//2. state高16位为读锁数量
	private int GetReadCount() { 
		return state >>> 16;
	}
	
	//2. 低16位为写锁数量
	private int GetWriteCount() { 
		return state & ((1 << 16) - 1);
	}
	
	//3. 获取读锁时先判断是否有写锁,有则等待,没有则将读锁数量加1
	public synchronized void lockRead() throws InterruptedException{  
		
		while (GetWriteCount() > 0) {
			wait();
		}
		
		System.out.println("lockRead ---" + Thread.currentThread().getName()); 
		state = state + (1 << 16);
	}
	
	//4. 释放读锁数量减1,通知所有等待线程
	public synchronized void unlockRead() {  
		state = state - ((1 << 16));
		notifyAll();
	}
	
	//5. 获取写锁时需要判断读锁和写锁是否都存在,有则等待,没有则将写锁数量加1
	public synchronized void lockWriters() throws InterruptedException{  
		
		while (GetReadCount() > 0 || GetWriteCount() > 0) {
			wait();
		}
		System.out.println("lockWriters ---" + Thread.currentThread().getName());
		state++;
	}
	
	//6. 释放写锁数量减1,通知所有等待线程
	public synchronized void unlockWriters(){  
		
		state--;
		notifyAll();
	}
}

4. 读写锁会发生写饥饿的情况吗?如果发生,有没有比较好的解决办法?

在读写的过程中,写操作一般是优先的,不能因为读操作过多,写操作一直等待的状况发生,这样会导致数据一直得不到更新,发生写饥饿。现在大家思考一下上面我们实现的简单读写锁,是否能做到这点呢?答案很明显,在读写线程都wait的情况下,通过notifyAll并不能保证写优先执行。那在这个例子中怎么改进这一点呢?

这里我通过添加一个中间变量来达到这个目的,这个中间变量在获取写锁之前先记录一个写请求,这样一旦notifyAll,优先检查是否有写请求,如果有,则让写操作优先执行,具体代码如下:

public class MyReadWriteLock {

	private int state = 0; //1. 定义一个读写锁共享变量state
	private int writeRequest = 0; //记录写请求数量
	
	//2. state高16位为读锁数量
	private int GetReadCount() { 
		return state >>> 16;
	}
	
	//2. 低16位为写锁数量
	private int GetWriteCount() { 
		return state & ((1 << 16) - 1);
	}
	
	//3. 获取读锁时先判断是否有写锁,有则等待,没有则将读锁数量加1
	public synchronized void lockRead() throws InterruptedException{  
		//写锁数量大于0或者写请求数量大于0的情况下都优先执行写
		while (GetWriteCount() > 0 || writeRequest > 0) { 
			wait();
		}
		
		System.out.println("lockRead ---" + Thread.currentThread().getName()); 
		state = state + (1 << 16);
	}
	
	//4. 释放读锁数量减1,通知所有等待线程
	public synchronized void unlockRead() {  
		state = state - ((1 << 16));
		notifyAll();
	}
	
	//5. 获取写锁时需要判断读锁和写锁是否都存在,有则等待,没有则将写锁数量加1
	public synchronized void lockWriters() throws InterruptedException{  
		
		writeRequest++; //写请求+1
		while (GetReadCount() > 0 || GetWriteCount() > 0) {
			wait();
		}
		writeRequest--; //获取写锁后写请求-1
		System.out.println("lockWriters ---" + Thread.currentThread().getName());
		state++;
	}
	
	//6. 释放写锁数量减1,通知所有等待线程
	public synchronized void unlockWriters(){  
		
		state--;
		notifyAll();
	}
}

大家可以测试下上面的代码看看是不是写请求都优先执行了呢?现在我们把这个问题放到ReentrantReadWriteLock中来考虑,显而易见,ReentrantReadWriteLock也会发生写请求饥饿的情况,因为写请求一样会排队,不管是公平锁还是非公平锁,在有读锁的情况下,都不能保证写锁一定能获取到,这样只要读锁一直占用,就会发生写饥饿的情况。那么JDK就没有提供什么好办法来解决这个问题吗?当然是有的,那就是JDK8中新增的改进读写锁---StampedLock,在下一篇文章中将会对StampedLock进行详细讲解。