一、简介
ReentrantLock是排他锁,下一篇会进行介绍,排他锁在同一时刻仅有一个线程可以进行访问,
实际上独占锁是一种相对比较保守的锁策略,在这种情况下任何读-读、读-写、写-写操作都不能同时进行,
在一定程度上降低了吞吐量(单位时间内能处理的请求数)。然而并发读之间不存在数据竞争问题,如果在读多写少场景下,允许并发读同时进行,会进一步提升性能。因此引入了ReentrantReadWriteLock,
顾名思义,ReentrantReadWriteLock是Reentrant(可重入)Read(读)Write(写)Lock(锁),我们下面称它为读写锁。
读写锁内部又分为读锁和写锁,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。读锁和写锁分离从而提升程序性能,
读写锁主要应用于读多写少的场景。先知道ReentrantReadWriteLock的这几点注意事项①支持公平和非公平(如果使用无参的构造函数的获取锁)②读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占
③条件变量(Condition)只能在写锁使用,读锁不允许获取条件变量,否则将得到UnsupportedOperationException异常④支持可重入,如已获取读锁的线程,能够再次获取读锁⑤如已获取写锁的线程,之后能够再次获取写锁,同时也可以再获取读锁。⑥ReentrantReadWriteLock可能会导致获取写锁的线程饥饿,可以看我的另一篇StampedLock的源码分析。StampedLock可以解决获取写锁的线程饥饿。ReentrantReadWriteLock有内部类读锁(ReadLock)和写锁(WriteLock),内部都是调用内部类Sync进行读写锁的获取和释放,Sync有两个子类FairSync和NonFairSync,ReentrantReadWriteLock锁的状态维护在AQS中的state属性,会此在看这篇时,需先将AQS源码进行理解,下面会进行详细的介绍。
二、类关系
//读写锁的接口
public interface ReadWriteLock {
//获取读锁抽象方法
Lock readLock();
//获取写锁抽象方法
Lock writeLock();
}
三、属性
//读锁,ReentrantReadWriteLock的ReadLock内部类
private final ReentrantReadWriteLock.ReadLock readerLock;
//写锁,ReentrantReadWriteLock的WriteLock内部类
private final ReentrantReadWriteLock.WriteLock writerLock;
//AQS的实现类,在下面内部类中会进行介绍,AQS不清楚的,可以看我的另一篇AQS源码分析https://juejin.cn/post/6844903872939425805
final Sync sync;
//UnSafe不清楚的可以看下我的分享https://juejin.cn/user/2647279730952478
private static final sun.misc.Unsafe UNSAFE;
//线程Thread的属性tid的相对偏移量,定义成静态是因为属性tid相对每个Thread实例的相对偏移量都是一样
private static final long TID_OFFSET;
static {
try {
//获取UnSafe实例,如果也想在自己的代码中使用UnSafe,也可以看下我的分享,里面有介绍到如何获取到UnSafe实例
UNSAFE = sun.misc.Unsafe.getUnsafe();
//获取Thread类的Class
Class<?> tk = Thread.class;
//使用UnSafe获取Thread类的属性tid相对偏移量
TID_OFFSET = UNSAFE.objectFieldOffset
(tk.getDeclaredField("tid"));
} catch (Exception e) {
//抛出错误
throw new Error(e);
}
}
四、构造函数
//无参构造函数,创建非公平锁
public ReentrantReadWriteLock() {
//调用有参的构造函数
this(false);
}
/**
* 传入fair参数创建ReentrantReadWriteLock实例
*
* @param fair true创建公平锁,false创建非公平锁
*/
public ReentrantReadWriteLock(boolean fair) {
//true创建公平锁,false创建非公平锁
sync = fair ? new FairSync() : new NonfairSync();
//创建读锁,ReadLock会在下面内部类中进行介绍
readerLock = new ReadLock(this);
//创建写锁,WriteLock会在下面内部类中进行介绍
writerLock = new WriteLock(this);
}
五、内部类
- sync内部类
//ReadLock和WriteLock的获取和释放都是使用Sync进行获取和释放,AQS的属性state即表示锁的状态 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; //分别存放读锁和写锁被获取的位数,即AQS的属性state(类型int)高16位存放读锁被获取个数,低16位存放写锁被获取个数 static final int SHARED_SHIFT = 16; //由于AQS属性state的高16位存放读锁被获取的个数,为此每次加读锁,获取一个读锁的最小单元 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //读锁和写锁的锁被获取个数都不能大于的最大值 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //AQS属性state低16位存放写锁被获取的个数,标记位,在下面exclusiveCount方法使用到,和锁状态即AQS中的属性state进行&操作,获取写锁被获取的个数 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //获取读锁被获取的个数,传入AQS的属性state值,即ReentrantReadWriteLock的锁状态,获取读锁被获取的个数,由于AQS属性state,即锁的状态,高16位存放读锁被获取的个数,为此要获取到读锁被获取的个数需将其state做右移16位,得到读锁被获取的个数 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //获取写锁被获取的个数,传入AQS的属性state值,即ReentrantReadWriteLock的锁状态,由于AQS属性state,即锁的状态,低16位存放写锁被获取的个数,为此要获取到写锁被获取的个数需将其&上EXCLUSIVE_MASK即低16位的标识值,得到写锁被获取的个数 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //记录线程获取读锁的次数count,和获取读锁的线程tid static final class HoldCounter { //线程获取读锁的次数 int count = 0; //存放获取读锁的当前线程tid final long tid = getThreadId(Thread.currentThread()); } //ThreadLocal实现的类,只是重写了ThreadLocal的initialValue方法,在调用get方法时,返回默认值HoldCounter实例 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { //重写ThreadLocal的initialValue方法 public HoldCounter initialValue() { //返回当前线程对应的HoldCounter实例 return new HoldCounter(); } } //ThreadLocal实现类的ThreadLocalHoldCounter实例,存放每个线程的HoldCounter实例,HoldCounter可以看上面的介绍 private transient ThreadLocalHoldCounter readHolds; //用来缓存最近一次获取读锁的线程的HoldCounter实例 private transient HoldCounter cachedHoldCounter; //表示第一个获取读锁的线程 private transient Thread firstReader = null; //表示第一个线程获取读锁的获取数 private transient int firstReaderHoldCount; //Sync构造函数,在子类FairSync或者NonFairSync被初始化时,会调用父类Sync的构造函数 Sync() { //实例化存放每个线程的HoldCounter实例的ThreadLocal实现类ThreadLocalHoldCounter readHolds = new ThreadLocalHoldCounter(); //设置AQS中的state属性,即ReentrantReadWriteLock锁的状态 setState(getState()); } //线程在获取读锁时,是否需要被阻塞,模板方法,NonFairSync和FairSync分别进行实现,下面会讲到 abstract boolean readerShouldBlock(); //线程在获取写锁时,是否需要被阻塞,模板方法,NonFairSync和FairSync分别进行实现,下面会讲到 abstract boolean writerShouldBlock(); //根据传入进来的要释放写锁被获取的个数尝试释放写锁,在下面WriteLock类中尝试释放锁使用到 protected final boolean tryRelease(int releases) { //isHeldExclusively方法会在下面进行介绍,判断当前线程是否是持有独占锁的线程,如果不是抛出IllegalMonitorStateException异常 if (!isHeldExclusively()) //不是持有独占锁的线程尝试释放独占锁,抛出IllegalMonitorStateException异常 throw new IllegalMonitorStateException(); //getState获取到ReentrantReadWriteLock的锁状态即AQS的state属性,减去要被释放写锁被获取的个数 int nextc = getState() - releases; //可以看上面exclusiveCount方法的介绍,传入锁状态,获取写锁被获取的个数是否等于0 boolean free = exclusiveCount(nextc) == 0; //如果free是true表示独占锁已经被释放 if (free) //将其表示占有独占锁的线程属性exclusiveOwnerThread置为空 setExclusiveOwnerThread(null); //因为独占锁是排它锁,只有当前线程对状态进行修改,不会涉及到并发,可以直接调用setState方法将其锁状态设置为新的锁状态值,即重新设置AQS中的属性state setState(nextc); //返回独占锁是否被释放 return free; } /** * 尝试获取写锁,传入acquires参数,表示要加写锁的获取数 * * @param acquires 获取写锁的个数 * @return true表示获取写锁成功 */ protected final boolean tryAcquire(int acquires) { //获取当前线程 Thread current = Thread.currentThread(); //获取ReentrantReadWriteLock的锁状态即AQS的state属性,getState方法在另一篇AQS源码分析中进行介绍 int c = getState(); //传入ReentrantReadWriteLock的锁状态即AQS的state属性获取写锁被获取的个数 int w = exclusiveCount(c); //如果ReentrantReadWriteLock锁的状态不等于0,表示有锁被获取 if (c != 0) { //如果写锁没有被获取,表明当前锁状态为读锁状态,或者写锁已经被获取但是占有写锁的线程不是当前线程 if (w == 0 || current != getExclusiveOwnerThread()) //返回获取写锁失败 return false; //如果当前线程要获取写锁的个数加上以前已经获取的写锁个数大于写锁被允许获取的最大个数MAX_COUNT,抛出错误Error if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //否则重新赋值锁状态即AQS的state属性,赋值为已获取的写锁个数加上目前要获取的写锁个数 setState(c + acquires); //返回获取写锁成功 return true; } //writerShouldBlock方法表示线程在获取写锁时,是否需要被阻塞,模板方法,NonFairSync和FairSync分别进行实现,下面会讲到 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) //目前ReentrantReadWriteLock实例处于无锁状态,有可能写锁被其他线程先获取到 //返回获取写锁失败 return false; //将表示占有写锁的线程属性exclusiveOwnerThread赋值为当前线程 setExclusiveOwnerThread(current); //返回获取写锁成功 return true; } /** * 尝试释放读锁,传入unused参数,此参数没有被使用 * * @param unused 此参数没有被使用到 * @return true表示释放读锁成功 */ protected final boolean tryReleaseShared(int unused) { //获取当前线程 Thread current = Thread.currentThread(); //如果当前线程是第一个获取读锁的线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; //表示第一个线程获取读锁的获取数,如果获取数为1 if (firstReaderHoldCount == 1) //将表示第一个获取读锁的线程,firstReader属性置为空 firstReader = null; else //否则第一个线程获取读锁的获取数做减1操作 firstReaderHoldCount--; } else { //获取最近一次获取读锁的线程的HoldCounter实例缓存 HoldCounter rh = cachedHoldCounter; //HoldCounter实例缓存为空,或者缓存的HoldCounter实例中tid属性不是当前线程的tid if (rh == null || rh.tid != getThreadId(current)) //从readHolds实例中重新获取当前线程的HoldCounter实例 rh = readHolds.get(); //获取当前线程获取读锁的获取数 int count = rh.count; //如果当前线程读锁的获取数小于等于1 if (count <= 1) { //将当前线程的HoldCounter实例移除 readHolds.remove(); //如果当前线程没有拥有读锁,释放读锁会抛出IllegalMonitorStateException异常 if (count <= 0) //可以看下面对unmatchedUnlockException方法的介绍 throw unmatchedUnlockException(); } //当前线程的读锁获取数做减1操作 --rh.count; } //释放锁,死循环修改锁状态,即AQS中的state属性 for (;;) { //获取ReentrantReadWriteLock锁的状态,即AQS的属性state int c = getState(); //因为锁状态AQS属性高16位存放读锁的个数,为此需要减去最小单元数SHARED_UNIT int nextc = c - SHARED_UNIT; //使用CAS更新锁的状态,即AQS的属性nextc if (compareAndSetState(c, nextc)) //锁状态更新成功,并且锁状态为0,表示释放读锁成功 return nextc == 0; } } //如果线程在没有拥有读锁的前提下去释放锁,会抛出IllegalMonitorStateException异常 private IllegalMonitorStateException unmatchedUnlockException() { return new IllegalMonitorStateException( "attempt to unlock read lock, not locked by current thread"); } /** * 尝试获取读锁,传入unused参数,此参数没有被使用 * * @param unused 此参数没有被使用到 * @return 1表示获取读锁成功,-1表示失败,获取读锁成功的获取数 */ protected final int tryAcquireShared(int unused) { //获取当前线程 Thread current = Thread.currentThread(); //获取ReentrantReadWriteLock锁状态,即AQS的属性state int c = getState(); //如果当前ReentrantReadWriteLock处于写锁状态,并且占有写锁的线程不是当前线程,直接返回-1,表示获取读锁失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) //返回-1,获取读锁失败 return -1; //传入ReentrantReadWriteLock锁状态,即AQS的属性state,得到读锁已被获取数 int r = sharedCount(c); //线程在获取读锁时,是否需要被阻塞,模板方法,NonFairSync和FairSync分别进行实现,下面会讲到 if (!readerShouldBlock() && //并且当前线程读锁被获取数小于允许的最大数MAX_COUNT r < MAX_COUNT && //并且使用CAS重新设置ReentrantReadWriteLock锁的状态为加上一个读锁的状态 compareAndSetState(c, c + SHARED_UNIT)) { //如果读锁上次的被获取数为0,即当前读锁被第一次获取 if (r == 0) { //将表示是第一个获取读锁的线程,firstReader属性设置为当前线程 firstReader = current; //表示第一个线程获取读锁的获取数赋值为1 firstReaderHoldCount = 1; //如果当前线程是第一个获取读锁的线程 } else if (firstReader == current) { //第一个线程获取读锁的获取数做加1操作 firstReaderHoldCount++; } else { //获取最近一次获取读锁的线程的HoldCounter实例缓存 HoldCounter rh = cachedHoldCounter; //HoldCounter实例缓存为空,或者缓存的HoldCounter实例中tid属性不是当前线程的tid if (rh == null || rh.tid != getThreadId(current)) //从readHolds实例中重新获取当前线程的HoldCounter实例,赋值给rh和cachedHoldCounter cachedHoldCounter = rh = readHolds.get(); //如果当前线程还没有获取过读锁 else if (rh.count == 0) //将当前线程的HoldCounter实例设置到readHolds属性中,readHolds属性可以看上面介绍 readHolds.set(rh); //当前线程的读锁获取数做加1操作 rh.count++; } //返回1,获取读锁成功 return 1; } //看下面对fullTryAcquireShared方法的介绍 return fullTryAcquireShared(current); } /** * 死循环尝试获取读锁,传入current参数,表示当前线程 * * @param current 表示当前线程 * @return 1表示获取读锁成功,-1表示失败,获取读锁成功的获取数 */ final int fullTryAcquireShared(Thread current) { //临时变量HoldCounter为null HoldCounter rh = null; //死循环获取读锁 for (;;) { //获取ReentrantReadWriteLock锁状态,即AQS的属性state int c = getState(); //如果当前ReentrantReadWriteLock处于写锁状态,并且占有写锁的线程不是当前线程,直接返回-1,表示获取读锁失败 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) //返回-1,获取读锁失败 return -1; //线程在获取读锁时,是否需要被阻塞,模板方法,NonFairSync和FairSync分别进行实现,下面会讲到 //如果当前锁FairSync公平锁,不管当前ReentrantReadWriteLock处于读锁状态,还是需等待同步队列(CLH)中前面节点的线程都获取到读锁,当前节点的线程才能获取到读锁 //如果当前锁NonFairSync公平锁,除非头节点的下一节点是要获取写锁的节点,才会阻塞当前线程,否则使用CAS更新锁状态,获取读锁 } else if (readerShouldBlock()) { //如果当前线程是第一个获取读锁的线程,不做任何操作 if (firstReader == current) { } else { if (rh == null) { //获取最近一次获取读锁的线程的HoldCounter实例缓存 rh = cachedHoldCounter; //HoldCounter实例缓存为空,或者缓存的HoldCounter实例中tid属性不是当前线程的tid if (rh == null || rh.tid != getThreadId(current)) { //从readHolds实例中重新获取当前线程的HoldCounter实例 rh = readHolds.get(); //当前线程需要被阻塞,并且没有获取过读锁 if (rh.count == 0) //将其当前线程的HoldConuter实例从readHolds中移除 readHolds.remove(); } } //当前线程需要被阻塞,并且没有获取过读锁 if (rh.count == 0) //返回-1,获取读锁失败 return -1; } } //读锁被获取数等于最大数MAX_COUNT,直接抛出Error if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //并且使用CAS重新设置ReentrantReadWriteLock锁的状态为加上一个读锁(SHARED_UNIT)的状态 if (compareAndSetState(c, c + SHARED_UNIT)) { //传入ReentrantReadWriteLock锁状态,即AQS的属性state,得到读锁已被获取数,如果读锁没有被获取过 if (sharedCount(c) == 0) { //将表示是第一个获取读锁的线程,firstReader属性设置为当前线程 firstReader = current; //表示第一个线程获取读锁的获取数赋值为1 firstReaderHoldCount = 1; //如果当前线程是第一个获取读锁的线程 } else if (firstReader == current) { //第一个线程获取读锁的获取数做加1操作 firstReaderHoldCount++; } else { if (rh == null) //获取最近一次获取读锁的线程的HoldCounter实例缓存 rh = cachedHoldCounter; //HoldCounter实例缓存为空,或者缓存的HoldCounter实例中tid属性不是当前线程的tid if (rh == null || rh.tid != getThreadId(current)) //从readHolds实例中重新获取当前线程的HoldCounter实例 rh = readHolds.get(); else if (rh.count == 0) //将当前线程的HoldCounter实例设置到readHolds属性中,readHolds属性可以看上面介绍 readHolds.set(rh); //当前线程的读锁获取数做加1操作 rh.count++; //当前线程的HoldCounter实例缓存起来,赋值给cachedHoldCounter cachedHoldCounter = rh; // cache for release } //返回1,获取读锁成功 return 1; } } } //尝试获取写锁 final boolean tryWriteLock() { //获取当前线程 Thread current = Thread.currentThread(); //获取ReentrantReadWriteLock锁状态,即AQS的属性state int c = getState(); //如果ReentrantReadWriteLock锁的状态不等于0,表示有锁被获取 if (c != 0) { //传入ReentrantReadWriteLock的锁状态即AQS的state属性获取写锁被获取的个数 int w = exclusiveCount(c); //如果写锁没有被获取,表明当前锁状态为读锁状态,或者写锁已经被获取但是占有写锁的线程不是当前线程 if (w == 0 || current != getExclusiveOwnerThread()) return false; //如果当前写锁被获取数已经等于被允许的最大值MAX_COUNT,直接抛出Error if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } //使用cas更新ReentrantReadWriteLock锁状态,即AQS的属性state,更新为加一个写锁的状态值 if (!compareAndSetState(c, c + 1)) //如果更新状态失败,直接返回false,获取写锁失败 return false; //否则将当前线程设置为拥有写锁的线程,即属性exclusiveOwnerThread设置为当前线程 setExclusiveOwnerThread(current); //返回成功,获取写锁成功 return true; } //死循环尝试获取读锁,直到获取读锁成功,或者获取读锁失败 final boolean tryReadLock() { //获取当前线程 Thread current = Thread.currentThread(); for (;;) { //获取ReentrantReadWriteLock锁状态,即AQS的属性state int c = getState(); //如果当前ReentrantReadWriteLock处于写锁状态,并且占有写锁的线程不是当前线程,直接返回-1,表示获取读锁失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) //返回获取读锁失败 return false; //传入ReentrantReadWriteLock锁状态,即AQS的属性state,得到读锁已被获取数 int r = sharedCount(c); //如果当前读锁被获取的个数已经等于被允许的最大值MAX_COUNT,直接抛出Error if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //并且使用CAS重新设置ReentrantReadWriteLock锁的状态为加上一个读锁(SHARED_UNIT)的状态 if (compareAndSetState(c, c + SHARED_UNIT)) { //如果读锁上次的被获取数为0,即当前读锁被第一次获取 if (r == 0) { //将表示是第一个获取读锁的线程,firstReader属性设置为当前线程 firstReader = current; //表示第一个线程获取读锁的获取数赋值为1 firstReaderHoldCount = 1; //如果当前线程是第一个获取读锁的线程 } else if (firstReader == current) { //第一个线程获取读锁的获取数做加1操作 firstReaderHoldCount++; } else { //获取最近一次获取读锁的线程的HoldCounter实例缓存 HoldCounter rh = cachedHoldCounter; //HoldCounter实例缓存为空,或者缓存的HoldCounter实例中tid属性不是当前线程的tid if (rh == null || rh.tid != getThreadId(current)) //从readHolds实例中重新获取当前线程的HoldCounter实例,赋值给rh和cachedHoldCounter cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) //将当前线程的HoldCounter实例设置到readHolds属性中,readHolds属性可以看上面介绍 readHolds.set(rh); //当前线程的读锁获取数做加1操作 rh.count++; } //返回获取读锁成功 return true; } } } /** * 判断拥有写锁的线程是否当前线程 * * @return true表示拥有写锁的线程是当前线程 */ protected final boolean isHeldExclusively() { //getExclusiveOwnerThread方法可以看我的另一篇AQS源码分析 return getExclusiveOwnerThread() == Thread.currentThread(); } //创建条件变量Condition实例ConditionObject final ConditionObject newCondition() { //返回ConditionObject实例 return new ConditionObject(); } //如果当前ReentrantReadWriteLock锁状态为写锁状态,即AQS的属性state,返回拥有写锁的当前线程,否则返回null final Thread getOwner() { //返回拥有写锁的线程,如果当前锁状态不处于写锁状态,直接返回空 return ((exclusiveCount(getState()) == 0) ? null : getExclusiveOwnerThread()); } //获取读锁被获取数 final int getReadLockCount() { //传入ReentrantReadWriteLock锁状态获取读锁被获取数,sharedCount可以看上面的介绍 return sharedCount(getState()); } //判断ReentrantReadWriteLock锁状态,即AQS属性state,是否处于写锁状态 final boolean isWriteLocked() { //exclusiveCount方法,传入锁状态,获取低16位存放写锁的被获取数,写锁被获取数不等于0,表示当前处于写锁状态 return exclusiveCount(getState()) != 0; } //获取写锁的被获取数,只能在拥有写锁的线程中使用 final int getWriteHoldCount() { //isHeldExclusively判断拥有写锁的线程是否当前线程,如果是返回写锁被获取数,否则返回0 return isHeldExclusively() ? exclusiveCount(getState()) : 0; } //获取当前线程读锁的获取数 final int getReadHoldCount() { //如果当前读锁没有被获取,读锁被获取数等于0,直接返回0 if (getReadLockCount() == 0) return 0; //获取当前线程 Thread current = Thread.currentThread(); //如果当前线程是第一个获取读锁的线程 if (firstReader == current) //返回第一个线程获取读锁的获取数 return firstReaderHoldCount; //获取最近一次获取读锁的线程的HoldCounter实例缓存 HoldCounter rh = cachedHoldCounter; //HoldCounter实例缓存为空,或者缓存的HoldCounter实例中tid属性不是当前线程的tid if (rh != null && rh.tid == getThreadId(current))、 //返回当前线程的读锁获取数 return rh.count; //否则的话从readHolds中获取线程对应的HoldCounter实例,得到当前线程的读锁获取数 int count = readHolds.get().count; //如果当前线程的读锁获取数为0,从readHolds中移除当前线程的HoldCounter实例 if (count == 0) readHolds.remove(); //返回当前线程的读锁获取数 return count; } //此方法目前没用到,从对象流中获取对象 private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); readHolds = new ThreadLocalHoldCounter(); setState(0); // reset to unlocked state } //获取ReentrantReadWriteLock锁状态,读写锁被获取的总数 final int getCount() { return getState(); } }
- NonFairSync内部类
//非公平锁,只实现Sync中的两个模板方法writerShouldBlock和readerShouldBlock static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; //线程在获取写锁时,是否需要被阻塞 final boolean writerShouldBlock() { //非公平锁直接返回false return false; } //线程在获取读锁时,是否需要被阻塞 final boolean readerShouldBlock() { //可以看下面对apparentlyFirstQueuedIsExclusive方法的介绍 return apparentlyFirstQueuedIsExclusive(); } } //AbstractQueuedSynchronizer类中的apparentlyFirstQueuedIsExclusive方法 final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; //如果同步队列中的头节点的下一个节点是要获取写锁的节点,返回true,表示获取读锁的线程需要阻塞 return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
- FairSync内部类
//公平锁,只实现Sync中的两个模板方法writerShouldBlock和readerShouldBlock static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; //线程在获取写锁时,是否需要被阻塞 final boolean writerShouldBlock() { //可以看下面对hasQueuedPredecessors方法的介绍 return hasQueuedPredecessors(); } //线程在获取读锁时,是否需要被阻塞 final boolean readerShouldBlock() { //可以看下面对hasQueuedPredecessors方法的介绍 return hasQueuedPredecessors(); } } //AbstractQueuedSynchronizer类中的hasQueuedPredecessors方法 public final boolean hasQueuedPredecessors() { //获取AQS中的同步队列尾节点 Node t = tail; //获取AQS中的同步队列头节点 Node h = head; Node s; //如果同步队列的头节点的下一节点中的线程不是当前线程,返回true,需要阻塞 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
- ReadLock内部类
//ReadLock和writeLock都实现Lock,下面方法在ReadLock和writeLock中会进行介绍 public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); } //读锁 public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; //读锁的获取和释放都是基于sync private final Sync sync; //传入ReentrantReadWriteLock构造ReadLock实例,因为需要sync,为此需要传入ReentrantReadWriteLock实例 protected ReadLock(ReentrantReadWriteLock lock) { //将sync赋值为lock中的sync sync = lock.sync; } //加读锁,不会抛出中断异常,直到获取读锁成功,在获取读锁的过程中,可能会阻塞,根据前置节点的状态 public void lock() { //acquireShared方法可以看AQS中对此方法的介绍,在下面读锁的获取会进行详细的介绍 sync.acquireShared(1); } //加读锁,在获取读锁的过程中,可能会阻塞,根据前置节点的状态,如果线程在等待获取读锁的过程中,有被中断,唤醒时会抛出中断异常,支持中断异常 public void lockInterruptibly() throws InterruptedException { //acquireSharedInterruptibly方法可以看AQS中对此方法的介绍,在下面读锁的获取会进行详细的介绍 sync.acquireSharedInterruptibly(1); } //死循环尝试获取读锁,直到获取读锁成功,或者获取读锁失败 public boolean tryLock() { //可以看上面对Sync中的tryReadLock方法的介绍 return sync.tryReadLock(); } //尝试在超时时间内获取读锁 public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { //tryAcquireSharedNanos方法可以看AQS中对此方法的介绍,在下面对读锁的获取会进行详细的介绍 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //释放读锁 public void unlock() { //releaseShared方法可以看AQS中对此方法的介绍,在下面读锁的释放会进行详细介绍 sync.releaseShared(1); } //读锁不支持Condition public Condition newCondition() { //否则会抛出UnsupportedOperationException throw new UnsupportedOperationException(); } //重写Object的toString方法 public String toString() { //获取读锁被获取数 int r = sync.getReadLockCount(); //返回父类的toString方法加上读锁的被获取数 return super.toString() + "[Read locks = " + r + "]"; } }
- WriteLock内部类
//写锁 public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; //写锁的获取和释放都是基于sync private final Sync sync; //传入ReentrantReadWriteLock构造WriteLock实例,因为需要sync,为此需要传入ReentrantReadWriteLock实例 protected WriteLock(ReentrantReadWriteLock lock) { //将sync赋值为lock中的sync sync = lock.sync; } //加写锁,会阻塞,不支持中断,直到获取写锁为此,等待获取写锁的过程中,可能会阻塞,根据前置节点的状态 public void lock() { //acquire方法可以看AQS中对此方法的介绍,在下面写锁的获取会进行详细的介绍 sync.acquire(1); } //加写锁,会阻塞,在获取写锁的过程中,可能会阻塞,根据前置节点的状态,如果线程在等待获取写锁的过程中,有被中断,唤醒时会抛出中断异常,支持中断异常 public void lockInterruptibly() throws InterruptedException { //acquireInterruptibly方法可以看AQS中对此方法的介绍,在下面写锁的获取会进行详细的介绍 sync.acquireInterruptibly(1); } //尝试获取锁,不会阻塞 public boolean tryLock( ) { //可以看上面对sync.tryWriteLock方法的介绍 return sync.tryWriteLock(); } //尝试在超时时间内获取写锁,会阻塞,在获取写锁的过程中,可能会阻塞,根据前置节点的状态,如果线程在等待获取写锁的过程中,有被中断,唤醒时会抛出中断异常,支持中断异常 public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { //tryAcquireNanos方法可以看AQS中对此方法的介绍,在下面写锁的获取会进行详细的介绍 return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } //释放写锁 public void unlock() { //release方法可以看AQS中对此方法的介绍,在下面写锁的释放会进行详细的介绍 sync.release(1); } //Condition条件变量只能从WriteLock进行实例化,不能从ReadLock进行实例化 public Condition newCondition() { //调用sync的newCondition方法,newCondition方法可以看上面的介绍 return sync.newCondition(); } public String toString() { //调用sync的getOwner方法,获取占有独占锁的线程 Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); } //判断拥有写锁的线程是否当前线程 public boolean isHeldByCurrentThread() { //调用sync的isHeldExclusively方法,isHeldExclusively方法可以看上面的介绍 return sync.isHeldExclusively(); } //获取写锁的被获取数,只能在拥有写锁的线程中使用 public int getHoldCount() { //调用sync的getWriteHoldCount方法,getWriteHoldCount方法可以看上面的介绍 return sync.getWriteHoldCount(); } }
六、独占锁
- 独占锁的获取
//写锁获取的整个链路进行介绍,以下也会介绍AQS中使用到的方法,AQS的详细介绍可以看我的另一篇AQS源码分析 //WriteLock的Lock方法,直到获取写锁成功,不支持中断 public void lock() { //调用sync的acquire方法,acquire是AbstractQueuedSynchronizer类中的方法,由sync继承下来 sync.acquire(1); } //AbstractQueuedSynchronizer中的acquire方法 public final void acquire(int arg) { //tryAcquire方法是个模板方法,AQS中此方法直接抛出UnsupportedOperationException异常,Sync有对AQS中的tryAcquire方法进行实现,Sync的tryAcquire可以看上面介绍 //在调用Sync的tryAcquire方法尝试获取写锁,如果获取写锁失败才会调用AQS中的acquireQueued方法循环进行获取写锁,直到获取写锁成功 if (!tryAcquire(arg) && //addWaiter方法先创建一个要获取写锁的新节点,加入到同步队列中,acquireQueued死循环的获取写锁,直到获取写锁成功 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //addWaiter、acquireQueued、selfInterrupt方法都是AQS中的方法 //重置线程的中断标志位,可以看下面对selfInterrupt方法的介绍 selfInterrupt(); } //AbstractQueuedSynchronizer中的addWaiter方法 //@param mode 要创建节点的模式,是要获取读锁的节点还是获取写锁的节点 private Node addWaiter(Node mode) { //根据当前线程和传入的节点模式创建新节点 Node node = new Node(Thread.currentThread(), mode); //获取同步队列(CLH)的尾节点 Node pred = tail; //如果尾节点不为空 if (pred != null) { //将新建节点的前置节点设置为尾节点 node.prev = pred; //使用CAS将新建节点设置为尾节点 if (compareAndSetTail(pred, node)) { //如果CAS成功,尾节点的下一节点为新建节点 pred.next = node; //返回新建节点 return node; } } //否则调用enq方法进行循环的将新建节点加入同步队列中,做为同步队列的尾节点,详细的可以看enq方法的介绍 enq(node); //返回新建节点 return node; } //AbstractQueuedSynchronizer中的enq方法 private Node enq(final Node node) { //死循环的将传入节点加入到同步队列中,做为同步队列的尾节点,直到节点加入队列成功为止 for (;;) { //获取尾节点 Node t = tail; //如果尾节点为空,表明同步队列不存在节点 if (t == null) { //新建个节点做为同步队列的头节点,使用CAS进行头节点的设置 if (compareAndSetHead(new Node())) //如果头节点设置成功,将尾节点设置为头节点 tail = head; } else {//否则队列不为空 //将新建节点的前置节点设置为尾节点 node.prev = t; //使用CAS将新建节点设置为尾节点 if (compareAndSetTail(t, node)) { //如果CAS成功,尾节点的下一节点为新建节点 t.next = node; //返回新建节点 return t; } } } } //AbstractQueuedSynchronizer中的acquireQueued方法,死循环的获取写锁,直到获取写锁成功为此 //@param node 要获取写锁的节点 //@param arg,要得到写锁的获取数,arg参数在sync中的tryAcquire方法使用到 //@return 在获取写锁的过程中,是否有发生中断请求 final boolean acquireQueued(final Node node, int arg) { //在获取写锁是否失败,如果失败在finally中将节点从同步队列中移除 boolean failed = true; try { //线程在获取写锁阻塞的过程中是否有其他线程发起中断请求,中断请求的标志位 boolean interrupted = false; //死循环的获取写锁,直到获取写锁成功 for (;;) { //获取传入节点的前置节点 final Node p = node.predecessor(); //传入节点的前置节点如果为头节点,调用sync中的重写AQS类中的tryAcquire(arg)方法 if (p == head && tryAcquire(arg)) { //将传入节点设置为同步队列的头节点 setHead(node); //传入节点的前置节点的下一节点设置为空 p.next = null; // help GC //设置获取写锁成功标志位 failed = false; //返回当前线程在获取写锁的过程中是否有发生中断标志位 return interrupted; } //shouldParkAfterFailedAcquire方法在上一次获取写锁失败时,是否需要阻塞,根据当前节点的前置节点状态来判断,详细的可以看下面的介绍 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //parkAndCheckInterrupt方法阻塞当前线程,并且在当前线程被唤醒时,检查当前线程是否被中断,parkAndCheckInterrupt返回中断标志位,详细的可以看下面的介绍 //parkAndCheckInterrupt返回线程在阻塞被唤醒时,线程是否被中断的标志位 interrupted = true; } } finally { //如果获取写锁失败, if (failed) //如果获取写锁失败,从同步队列中移除当前节点,根据当前节点的前置节点状态是否唤醒当前节点的不为空的下一节点线程,cancelAcquire方法可以看下面详细介绍 cancelAcquire(node); } } //AbstractQueuedSynchronizer中的selfInterrupt方法 static void selfInterrupt() { //设置当前线程的中断标志位 Thread.currentThread().interrupt(); } //AbstractQueuedSynchronizer中的shouldParkAfterFailedAcquire方法 //即线程在上一次没有获取到写锁,再次获取写锁时是否需要阻塞 //@param pred 传入节点node的前置节点 //@param node 要获取写锁的节点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前置节点的状态,如果对AQS状态不清楚的可以看下另一篇AQS源码分析 int ws = pred.waitStatus; //如果前置节点的状态为Node.SIGNAL,SIGNAL状态表示下一节点需要阻塞 if (ws == Node.SIGNAL) //返回当前要获取写锁的节点需要阻塞 return true; //如果当前要获取写锁的节点的前置节点已取消CANCELLED,重新获取有效的前置节点 if (ws > 0) { do { //重新设置要获取写锁节点的前置节点 node.prev = pred = pred.prev; } while (pred.waitStatus > 0);//循环获取前面有效的前置节点,CLH队列一定会有个有效的头节点 //有效的前置节点的下一节点设置为当前要获取写锁的节点 pred.next = node; } else { //否则将前置节点的状态设置为Node.SIGNAL,在下一次循环时,将其要获取写锁的节点阻塞 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //返回不需要阻塞要获取写锁的节点 return false; } //AbstractQueuedSynchronizer中的parkAndCheckInterrupt方法 private final boolean parkAndCheckInterrupt() { //阻塞当前线程,this是监控对象,这样可以知道当前线程是被那个对象阻塞 LockSupport.park(this); //线程被唤醒时,判断在等待的过程中是否有中断请求 return Thread.interrupted(); } //AbstractQueuedSynchronizer中的cancelAcquire方法 //将传入节点从同步队列中移除,如果要移除节点为尾节点,重新设置尾节点,如果要移除节点的前置节点为头节点,唤醒要取消节点的下一节点 //@param node 要取消的节点 private void cancelAcquire(Node node) { //如果传入的要取消节点为空,直接退出 if (node == null) //直接退出 return; //将要取消节点的线程置为空 node.thread = null; //获取要取消节点的前置节点 Node pred = node.prev; //循环获取不是取消的前置节点节点 while (pred.waitStatus > 0) node.prev = pred = pred.prev; //获取前置节点的下一个节点,在下面cas需要使用到 Node predNext = pred.next; //将要取消节点的状态设置为取消Node.CANCELLED状态 node.waitStatus = Node.CANCELLED; //如果要取消节点为尾节点,将其尾节点有效的前置节点 if (node == tail && compareAndSetTail(node, pred)) { //将有效的前置节点的下一节点设置为空 compareAndSetNext(pred, predNext, null); } else { //否则的话根据有效的前置节点状态和是否头节点来判断是否需要唤醒要取消节点的下一节点 int ws; //有效的前置节点不是头节点 if (pred != head && //有效的前置节点状态为Node.SIGNAL,或者使用cas将有效的前置节点的状态设置为Node.SIGNAL,并且有效的前置节点的线程不为空 ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { //获取要取消节点的下一个节点 Node next = node.next; //要取消节点的下一个节点不为空,并且不是已取消状态 if (next != null && next.waitStatus <= 0) //将要取消节点的下一个节点和有效的前置节点连接起来 compareAndSetNext(pred, predNext, next); } else { //唤醒要取消节点的下一个节点unparkSuccessor方法可以看下面 unparkSuccessor(node); } //将要取消节点的下一个节点设置为自身 node.next = node; } } //AbstractQueuedSynchronizer中的unparkSuccessor方法 //唤醒当前传入节点的下一个不为空并且不是取消的节点线程 //@param node 唤醒传入节点下一节点 private void unparkSuccessor(Node node) { //传入节点的状态 int ws = node.waitStatus; //如果传入节点的状态小于0,即SIGNAL或者PROPAGATE if (ws < 0) //使用CAS将传入节点的状态设置为0 compareAndSetWaitStatus(node, ws, 0); //获取传入节点的下一节点 Node s = node.next; //如果下一节点为空,或者是已取消状态 if (s == null || s.waitStatus > 0) { //将下一节点设置为空 s = null; //从尾节点开始寻找传入节点有效的下一节点(不为空,并且不是已取消状态) for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //如果下一个有效节点不为空,并且不是已取消状态,唤醒此节点对应的线程 if (s != null) //唤醒下一节点的线程 LockSupport.unpark(s.thread); } //WriteLock的lockInterruptibly,和lock方法的差别,只是在获取写锁时,如果线程被中断会抛出中断异常 public void lockInterruptibly() throws InterruptedException { //调用sync从AQS中继承下来的acquireInterruptibly方法,可以看下面对此方法的介绍 sync.acquireInterruptibly(1); } //AbstractQueuedSynchronizer的acquireInterruptibly方法 //@param arg 要获取写锁的获取数 public final void acquireInterruptibly(int arg) throws InterruptedException { //要获取写锁的线程被中断,抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); //调用Sync重写AQS的tryAcquire方法,获取写锁,详细的可以看下Sync的tryAcquire方法 if (!tryAcquire(arg)) //如果调用tryAcquire方法失败,调用sync从AQS中继承下来的doAcquireInterruptibly方法 doAcquireInterruptibly(arg); } //AbstractQueuedSynchronizer的doAcquireInterruptibly方法,和上面介绍的acquireQueued方法唯一不同是,在线程Wait被唤醒时,检测线程是否被中断,如果被中断直接抛出中断异常 private void doAcquireInterruptibly(int arg) throws InterruptedException { //根据当前线程和独占模式创建新节点,addWaiter方法可以看上面的介绍 final Node node = addWaiter(Node.EXCLUSIVE); //在获取写锁是否失败,如果失败在finally中将节点从同步队列中移除 boolean failed = true; try { //死循环的获取写锁,直到获取写锁成功或者抛出中断异常 for (;;) { //获取新建节点的前置节点 final Node p = node.predecessor(); //新建节点的前置节点如果为头节点,调用sync中的重写AQS类中的tryAcquire(arg)方法 if (p == head && tryAcquire(arg)) { //将新建节点设置为同步队列的头节点 setHead(node); //新建节点的前置节点的下一节点设置为空 p.next = null; // help GC //设置获取写锁成功标志位 failed = false; //直接退出 return; } //shouldParkAfterFailedAcquire方法在上一次获取写锁失败时,是否需要阻塞,根据当前节点的前置节点状态来判断,详细的可以看上面的介绍 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //parkAndCheckInterrupt方法阻塞当前线程,并且在当前线程被唤醒时,检查当前线程是否被中断,parkAndCheckInterrupt返回中断标志位,详细的可以看上面的介绍 //parkAndCheckInterrupt返回线程在等待被唤醒时,线程是否被中断的标志位 throw new InterruptedException(); } } finally { //如果获取写锁失败, if (failed) //如果获取写锁失败,从同步队列中移除当前节点,根据当前节点的前置节点状态是否唤醒当前节点的不为空的下一节点线程,cancelAcquire方法可以看上面详细介绍 cancelAcquire(node); } } //WriteLock的tryLock方法,尝试获取写锁,获取不到直接返回,不会加入同步队列中 public boolean tryLock() { //调用在Sync介绍的tryWriteLock方法 return sync.tryWriteLock(); }
- 独占锁的释放
//WriteLock中的unlock方法,如果释放写锁的线程不是拥有写锁的线程会抛出IllegalMonitorStateException异常 public void unlock() { //调用sync从AQS中继承下来的release方法 sync.release(1); } //AbstractQueuedSynchronizer中的release方法 public final boolean release(int arg) { //调用sync中的tryRelease方法,释放写锁,如果释放成功 if (tryRelease(arg)) { //获取同步队列中的头节点 Node h = head; //如果头节点不为空,并且状态不是0 if (h != null && h.waitStatus != 0) //获取头节点的下一有效节点(节点不为空,并且状态不是已取消状态) unparkSuccessor(h); //释放写锁成功 return true; } //释放写锁失败 return false; } //AbstractQueuedSynchronizer中的tryRelease方法,模板方法,子类必须重写,否则直接抛出UnsupportedOperationException异常 protected boolean tryRelease(int arg) { //抛出UnsupportedOperationException异常 throw new UnsupportedOperationException(); } //Sync重写AQS的tryRelease模板方法 protected final boolean tryRelease(int releases) { //判断当前线程是否是占有写锁的线程 if (!isHeldExclusively()) //如果释放写锁的线程不是占有写锁的线程,直接抛出IllegalMonitorStateException异常 throw new IllegalMonitorStateException(); //将锁的状态减去要释放的写锁获取数 int nextc = getState() - releases; //如果根据新的锁状态获取写锁数,如果写锁获取数为0,释放写锁成功 boolean free = exclusiveCount(nextc) == 0; if (free) //释放写锁成功,将占有写锁的线程置为空,即AbstractOwnableSynchronizer中exclusiveOwnerThread属性置为空 setExclusiveOwnerThread(null); //设置锁的状态,因为写锁只能有一个线程获取,为此可以直接调用setState方法设置锁状态 setState(nextc); //返回写锁是否释放成功 return free; }
七、共享锁
- 共享锁的获取
//ReadLock的lock方法,获取读锁,直到获取读锁成功,不支持中断 public void lock() { //调用sync从AQS中继承下来的acquireShared方法获取读锁 sync.acquireShared(1); } //AbstractQueuedSynchronizer中的acquireShared方法 public final void acquireShared(int arg) { //调用Sync重写AQS的tryAcquireShared方法,获取读锁,详细的可以看下Sync的tryAcquireShared方法 if (tryAcquireShared(arg) < 0) //如果tryAcquireShared获取读锁失败,调用doAcquireShared进行死循环获取读锁,可以看下面的对此方法的介绍 doAcquireShared(arg); } //AbstractQueuedSynchronizer的doAcquireShared方法 private void doAcquireShared(int arg) { //根据当前线程和共享模式创建新节点,addWaiter方法可以看上面的介绍 final Node node = addWaiter(Node.SHARED); //在获取读锁是否失败,如果失败在finally中将节点从同步队列中移除 boolean failed = true; try { //线程在获取读锁阻塞的过程中是否有其他线程发起中断请求,中断请求的标志位 boolean interrupted = false; for (;;) { //获取新建节点的前置节点 final Node p = node.predecessor(); //新建节点的前置节点如果为头节点 if (p == head) { //调用Sync重写AQS的tryAcquireShared方法,获取读锁,详细的可以看下Sync的tryAcquireShared方法 int r = tryAcquireShared(arg); if (r >= 0) { //将新建节点设置为头节点,根据得到读锁的获取数和头节点及头节点状态判断是否需要唤醒下一个有效节点 setHeadAndPropagate(node, r); //新建节点的前置节点的下一节点设置为空 p.next = null; // help GC //如果当前线程在获取读锁的过程中有被其他线程发起中断请求,重置中断请求标志位 if (interrupted) selfInterrupt(); //设置获取读锁成功标志位 failed = false; //获取读锁成功 return; } } //shouldParkAfterFailedAcquire方法在上一次获取读锁失败时,是否需要阻塞,根据当前节点的前置节点状态来判断,详细的可以看上面的介绍 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //parkAndCheckInterrupt方法阻塞当前线程,并且在当前线程被唤醒时,检查当前线程是否被中断,parkAndCheckInterrupt返回中断标志位,详细的可以看上面的介绍 //parkAndCheckInterrupt返回线程在等待被唤醒时,线程是否被中断的标志位 interrupted = true; } } finally { if (failed) //如果获取读锁失败,从同步队列中移除当前节点,根据当前节点的前置节点状态是否唤醒当前节点的不为空的下一节点线程,cancelAcquire方法可以看上面详细介绍 cancelAcquire(node); } } //ReadLock的lockInterruptibly方法,获取读锁,直到获取读锁成功,或者在获取写锁阻塞被唤醒的时候被中断,抛出中断异常,支持中断 public void lockInterruptibly() throws InterruptedException { //调用sync从AQS中继承下来的acquireSharedInterruptibly方法获取读锁 sync.acquireSharedInterruptibly(1); } //AbstractQueuedSynchronizer的acquireSharedInterruptibly方法 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //检查线程是否有被中断 if (Thread.interrupted()) //有的话直接抛出中断异常 throw new InterruptedException(); //调用Sync重写AQS的tryAcquireShared方法,获取读锁,详细的可以看下Sync的tryAcquireShared方法 if (tryAcquireShared(arg) < 0) //doAcquireSharedInterruptibly和上面doAcquireShared介绍的区别是线程在获取读锁阻塞被唤醒时,如果有中断请求直接抛出中断异常,详细的可以看AQS源码分析中对此方法的介绍 doAcquireSharedInterruptibly(arg); } //ReaderLock的tryLock方法,不支持中断异常,死循环尝试获取读锁,直到获取成功,或者当前锁的状态是写锁也会退出返回,不会加入同步队列中 public boolean tryLock() { //调用在Sync介绍的tryReadLock方法 return sync.tryReadLock(); } //ReadLock的tryLock方法,支持中断异常 public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //AbstractQueuedSynchronizer的tryAcquireSharedNanos方法 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { //检查线程是否有被中断 if (Thread.interrupted()) //有的话直接抛出中断异常 throw new InterruptedException(); //调用Sync重写AQS的tryAcquireShared方法,获取读锁,详细的可以看下Sync的tryAcquireShared方法 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); // } //AbstractQueuedSynchronizer的doAcquireSharedNanos,死循环的获取读锁,直到超时、或者获取到写锁 private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { //如果传入的超时时间小于等于0 if (nanosTimeout <= 0L) //返回获取写锁失败 return false; //当前时间加上超时时间得到死亡时间 final long deadline = System.nanoTime() + nanosTimeout; //根据当前线程和共享模式创建新节点,addWaiter方法可以看上面的介绍 final Node node = addWaiter(Node.SHARED); //在获取读锁是否失败,如果失败在finally中将节点从同步队列中移除 boolean failed = true; try { for (;;) { //获取新建节点的前置节点 final Node p = node.predecessor(); //新建节点的前置节点如果为头节点 if (p == head) { //调用Sync重写AQS的tryAcquireShared方法,获取读锁,详细的可以看下Sync的tryAcquireShared方法 int r = tryAcquireShared(arg); if (r >= 0) { //将新建节点设置为头节点,根据得到读锁的获取数和头节点及头节点状态判断是否需要唤醒下一个有效节点 setHeadAndPropagate(node, r); //新建节点的前置节点的下一节点设置为空 p.next = null; // help GC //设置获取读锁成功标志位 failed = false; //获取读锁成功 return true; } } //死亡时间减去当前时间,得到超时时间 nanosTimeout = deadline - System.nanoTime(); //如果超时时间小于等于0,直接返回获取读锁失败 if (nanosTimeout <= 0L) //返回获取读锁失败 return false; //shouldParkAfterFailedAcquire方法在上一次获取读锁失败时,是否需要阻塞,根据当前节点的前置节点状态来判断,详细的可以看上面的介绍 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //超时时间大于一定的阈值,才会阻塞要获取读锁的线程 //超时阻塞要获取读锁的线程 LockSupport.parkNanos(this, nanosTimeout); //在阻塞等待获取读锁时,有其他线程发起中断请求,直接抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) //如果获取读锁失败,从同步队列中移除当前节点,根据当前节点的前置节点状态是否唤醒当前节点的不为空的下一节点线程,cancelAcquire方法可以看上面详细介绍 cancelAcquire(node); } }
- 共享锁的释放
//ReadLock的unlock方法,释放读锁 public void unlock() { //调用sync从AQS中继承下来的releaseShared方法释放读锁 sync.releaseShared(1); } public final boolean releaseShared(int arg) { //调用sync中的tryReleaseShared方法,释放读锁,如果释放成功,调用Sync从AQS继承下来的doReleaseShared方法 if (tryReleaseShared(arg)) { doReleaseShared(); //释放读锁成功 return true; } //释放读锁失败 return false; } //AbstractQueuedSynchronizer的doReleaseShared方法 private void doReleaseShared() { //死循环修改头节点状态,根据头节点状态是否为SIGNAL状态值,唤醒头节点的下一个有效节点 for (;;) { //获取头节点 Node h = head; //头节点不为空,并且头尾节点不相同 if (h != null && h != tail) { //获取头节点的状态值 int ws = h.waitStatus; //头节点的状态值为Node.SIGNAL if (ws == Node.SIGNAL) { //将头节点的状态值从Node.SIGNAL修改为0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //如果失败重新循环 continue; //唤醒头节点的下一有效 unparkSuccessor(h); } //如果头节点的状态值为0,将头节点的状态值从0修改为Node.PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //如果失败重新循环 continue; } //如果头节点没有改变,直接退出循环 if (h == head) //退出循环 break; } }
八、其他方法
//判断当前ReentrantReadWriteLock是否处于公平获取锁状态
public final boolean isFair() {
//判断sync是否是FairSync的实例
return sync instanceof FairSync;
}
//获取占有写锁的线程,如果当前不处于写锁状态,直接返回null
protected Thread getOwner() {
//返回占有写锁的线程,getOwner在Sync内部类中有进行介绍,详细的可以看上面
return sync.getOwner();
}
//获取ReentrantReadWriteLock实例的读锁被获取数
public int getReadLockCount() {
//返回读锁被获取数,getReadLockCount在Sync内部类中有进行介绍,详细的可以看上面
return sync.getReadLockCount();
}
//判断ReentrantReadWriteLock实例是否处于写锁状态
public boolean isWriteLocked() {
//返回锁状态是否处于写锁状态,isWriteLocked在Sync内部类中有进行介绍,详细的可以看上面
return sync.isWriteLocked();
}
//判断占有写锁的线程是否是当前线程
public boolean isWriteLockedByCurrentThread() {
//返回占有写锁的线程是否是当前线程,isHeldExclusively在Sync内部类中有进行介绍,详细的可以看上面
return sync.isHeldExclusively();
}
//获取ReentrantReadWriteLock实例的写锁被获取数,只能是占有写锁的线程调用,否则返回0
public int getWriteHoldCount() {
//写锁被获取数,只有占有写锁的线程调用,才能返回写锁的获取数,否则返回0,getWriteHoldCount在Sync内部类中有进行介绍,详细的可以看上面
return sync.getWriteHoldCount();
}
//获取当前线程已获取的读锁数
public int getReadHoldCount() {
//返回当前线程已获取的读锁数,getReadHoldCount在Sync内部类中有进行介绍,详细的可以看上面
return sync.getReadHoldCount();
}
//获取AQS同步队列中所有要获取写锁的节点线程
protected Collection<Thread> getQueuedWriterThreads() {
//getExclusiveQueuedThreads方法可以看AQS中的源码分析,对此方法的介绍,也是从同步队列的尾节点开始往前遍历,获取模式是获取写锁的节点的线程,加入到List集合中
return sync.getExclusiveQueuedThreads();
}
//获取AQS同步队列中所有要获取读锁的节点线程
protected Collection<Thread> getQueuedReaderThreads() {
//getSharedQueuedThreads方法可以看AQS中的源码分析,对此方法的介绍,也是从同步队列的尾节点开始往前遍历,获取模式是获取读锁的节点的线程,加入到List集合中
return sync.getSharedQueuedThreads();
}
//判断AQS中的同步队列是否有节点
public final boolean hasQueuedThreads() {
//返回同步队列是否有节点,hasQueuedThreads方法可以看AQS中的源码分析,对此方法的介绍
return sync.hasQueuedThreads();
}
//根据传入线程,判断线程是否有在AQS的同步队列中
public final boolean hasQueuedThread(Thread thread) {
//返回传入的线程是否有在同步队列中,isQueued方法可以看AQS中的源码分析,对此方法的介绍
return sync.isQueued(thread);
}
//获取AQS队列中有多少节点,返回是估计值,不是准确值,因为同步队列会改变,节点的加入和移除
public final int getQueueLength() {
//获取AQS队列中有多少节点的近似值,getQueueLength方法可以看AQS中的源码分析,对此方法的介绍
return sync.getQueueLength();
}
//获取AQS同步队列中的所有等待节点(包括获取读锁和写锁节点)对应的不为空的线程
protected Collection<Thread> getQueuedThreads() {
//getQueuedThreads方法可以看AQS中的源码分析,对此方法的介绍
return sync.getQueuedThreads();
}
//传入condition条件变量,获取条件变量中的条件队列是否有等待节点,节点的状态值为CONDITION
public boolean hasWaiters(Condition condition) {
//传入条件变量为空,抛出空指针异常
if (condition == null)
throw new NullPointerException();
//如果传入条件变量不是AQS的ConditionObject实例,抛出IllegalArgumentException
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
//hasWaiters方法内部也是会校验Condition是否有ReentrantReadWriteLock实例中的Sync创建出来,如果不是抛出IllegalArgumentException,如果是调用Condtion的hasWaiters方法,也只能是占有写锁的线程调用此方法,从条件队列的尾节点循环寻找是否有节点不为空,并且状态值为CONDITION,hasWaiters方法可以看AQS中的源码分析,对此方法的介绍
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
//传入condition条件变量,获取条件变量中的条件队列中有多少个等待节点,即多少个节点的状态值为CONDITION
public int getWaitQueueLength(Condition condition) {
//传入条件变量为空,抛出空指针异常
if (condition == null)
throw new NullPointerException();
//如果传入条件变量不是AQS的ConditionObject实例,抛出IllegalArgumentException
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
//getWaitQueueLength方法内部也是会校验Condition是否有ReentrantReadWriteLock实例中的Sync创建出来,如果不是抛出IllegalArgumentException,也只能是占有写锁的线程调用此方法,如果是调用Condtion的getWaitQueueLength方法,从条件队列的尾节点循环寻找是否有节点不为空,并且状态值为CONDITION,getWaitQueueLength方法可以看AQS中的源码分析,对此方法的介绍
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
//传入condition条件变量,获取条件变量中的条件队列中所有等待节点(状态值为CONDITION)的线程
protected Collection<Thread> getWaitingThreads(Condition condition) {
//传入条件变量为空,抛出空指针异常
if (condition == null)
throw new NullPointerException();
//如果传入条件变量不是AQS的ConditionObject实例,抛出IllegalArgumentException
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
//getWaitingThreads方法内部也是会校验Condition是否有ReentrantReadWriteLock实例中的Sync创建出来,如果不是抛出IllegalArgumentException,也只能是占有写锁的线程调用此方法,如果是调用Condtion的getWaitingThreads方法,从条件队列的尾节点循环寻找是否有节点不为空,并且状态值为CONDITION的线程加入集合中,getWaitingThreads方法可以看AQS中的源码分析,对此方法的介绍
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
public String toString() {
//得到锁的状态
int c = sync.getCount();
//根据锁的状态得到写锁被获取数
int w = Sync.exclusiveCount(c);
//根据锁的状态得到读锁被获取数
int r = Sync.sharedCount(c);
return super.toString() +
"[Write locks = " + w + ", Read locks = " + r + "]";
}
//传入线程,使用UnSafe根据Thread属性tid相对偏移量获取线程的tid,不清楚的可以看我分享的UnSafe使用
static final long getThreadId(Thread thread) {
//返回线程Thread属性tid值
return UNSAFE.getLongVolatile(thread, TID_OFFSET);
}