Redisson单进程Redis分布式悲观锁的使用与实现
本文基于Redisson 3.7.5
1. 可重入锁(Reentrant Lock)
这种锁的使用方式和Java本身框架中的Reentrant Lock一模一样
RLock lock = redisson.getLock("testLock");
try{
// 1. 最常见的使用方法
//lock.lock();
// 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁
//lock.lock(10, TimeUnit.SECONDS);
// 3. 尝试加锁,最多等待3秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS);
if(res){ //成功
// do your business
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
我们来看看底层实现,首先看看RLock接口:
该接口主要继承了Lock接口还有其他Redisson, 并扩展了部分方法, 比如:boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)新加入的leaseTime主要是用来设置锁的过期时间, 如果超过leaseTime还没有解锁的话, redis就强制解锁. leaseTime的默认时间是30s
redisson.getLock("anyLock");
的源代码是:
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
可以看出,可重入锁的实现类就是RedissonLock。
1.1 可重入锁上锁源码
RedissonLock的主要构成:
public class RedissonLock extends RedissonExpirable implements RLock {
private static final Logger log = LoggerFactory.getLogger(RedissonLock.class);
//对于不过期的锁,redisson也会每隔一段时间设置一个默认的内部锁过期时间(就是下面的internalLockLeaseTime),这是个定时任务,只要还持有锁就会一直刷新这个过期时间,防止进程死掉后锁一直不释放
private static final ConcurrentMap<String, Timeout> expirationRenewalMap = PlatformDependent.newConcurrentHashMap();
//内部锁过期时间
protected long internalLockLeaseTime;
//UUID,在Redisson初始化的时候生成
final UUID id;
//公共PUBLISH和SUBSCRIBE消息处理类
protected static final LockPubSub PUBSUB = new LockPubSub();
//命令执行代理
final CommandAsyncExecutor commandExecutor;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}
//对于同一个Lock的控制Entry
protected String getEntryName() {
return id + ":" + getName();
}
//PUBLISH和SUBSCRIBE的Redis channel名称
String getChannelName() {
return prefixName("redisson_lock__channel", getName());
}
//本次锁名称,就是上锁时HASH里面的KEY
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
}
在redis中的组成包括:
- 一个HASH(HASH的名字就是获取的锁的命名域,就是redisson.getLock("testLock")
的参数testLock),HASH里面有一个键值对,KEY为当前上锁的唯一标识(由Redisson初始化的时候生成的全局唯一ID+当前线程ID组成,就是getLockName
方法的返回)。VALUE就是上锁次数(因为可重入)。
- 一个订阅发布通道,名字是getChannelName
方法的返回,这里就是就是redisson_lock__channel:{testLock}
上锁核心逻辑:
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//尝试获取锁,如果成功就会返回null
Long ttl = tryAcquire(leaseTime, unit, threadId);
//如果ttl为null,代表获取成功,返回
if (ttl == null) {
return;
}
//如果获取失败,则需要订阅到对应这个锁的channel来及时被唤醒重新尝试抢锁
RFuture<RedissonLockEntry> future = subscribe(threadId);
//subscribe方法返回的是异步结果Future,通过commandExecutor来同步
commandExecutor.syncSubscription(future);
try {
while (true) {
//再次尝试获取
ttl = tryAcquire(leaseTime, unit, threadId);
//如果ttl为null,代表获取成功,停止循环
if (ttl == null) {
break;
}
if (ttl >= 0) {
//如果ttl大于零,则等待ttl么多时间之后继续尝试获取(通过死循环继续尝试)
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
//否则,证明一直不过期,等待释放
getEntry(threadId).getLatch().acquire();
}
}
} finally {
//无论怎样,都取消对于这个channel的订阅
unsubscribe(future, threadId);
}
}
尝试获取锁tryAcquireAsync的核心逻辑:
Redisson对于永久锁(就是不带过期时间的锁)处理比较特殊,并不是真的永久。而是先设置一个内部锁过期时间internalLockLeaseTime,之后每过三分之内部锁过期时间之后刷新这个锁的过期时间为internalLockLeaseTime。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
//如果过期时间不为永远不过期,就按照普通的方式获取锁
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//否则,在获取锁之后,需要加上定时任务,给锁设置一个内部过期时间,并不断刷新这个时间直到释放锁
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// ttlRemaining不为null的话代表还持有这个锁
if (ttlRemaining == null) {
//定时刷新该锁的内部锁更新时间
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
执行lua脚本tryLockInnerAsync获取锁的逻辑:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//如果不存在,证明可以上锁
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果已存在,判断当前获取锁的是不是本线程
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//已存在并且获取锁的不是自己,就返回锁过期时间
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
1.2 可重入锁解锁源码
解锁逻辑比较简单
public void unlock() {
//解锁
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
//返回null代表不是当前线程获取锁,需要报警
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
//如果返回为true,证明已经释放锁,可以取消非过期锁刷新过期时间的定时任务了
if (opStatus) {
cancelExpirationRenewal();
}
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果锁已经不存在,直接发布锁已释放的消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
//如果锁存在但是不是当前线程占用了这个锁,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//释放一次锁,倘若剩余次数大于0,刷新锁过期时间
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
//否则,代表锁已经被释放,删除这个key并发布锁被释放的消息
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}