Redis系列-生产应用篇-分布式锁(2)-单进程Redis分布式锁的Java实现(Redisson使用与底层实现)-可重入锁

1,978 阅读5分钟
原文链接: blog.csdn.net

Redisson单进程Redis分布式悲观锁的使用与实现

本文基于Redisson 3.7.5

1. 可重入锁(Reentrant Lock)

这种锁的使用方式和Java本身框架中的Reentrant Lock一模一样

RLock lock = redisson.getLock("testLock");
try{
    // 1. 最常见的使用方法
    //lock.lock();

    // 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁
    //lock.lock(10, TimeUnit.SECONDS);

    // 3. 尝试加锁,最多等待3秒,上锁以后10秒自动解锁
    boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS);
    if(res){    //成功
        // do your business

    }
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    lock.unlock();
}

我们来看看底层实现,首先看看RLock接口:
image

该接口主要继承了Lock接口还有其他Redisson, 并扩展了部分方法, 比如:boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)新加入的leaseTime主要是用来设置锁的过期时间, 如果超过leaseTime还没有解锁的话, redis就强制解锁. leaseTime的默认时间是30s

redisson.getLock("anyLock");的源代码是:

@Override
public RLock getLock(String name) {
    return new RedissonLock(connectionManager.getCommandExecutor(), name);
}

可以看出,可重入锁的实现类就是RedissonLock。

1.1 可重入锁上锁源码

RedissonLock的主要构成:


public class RedissonLock extends RedissonExpirable implements RLock {
    private static final Logger log = LoggerFactory.getLogger(RedissonLock.class);

    //对于不过期的锁,redisson也会每隔一段时间设置一个默认的内部锁过期时间(就是下面的internalLockLeaseTime),这是个定时任务,只要还持有锁就会一直刷新这个过期时间,防止进程死掉后锁一直不释放
    private static final ConcurrentMap<String, Timeout> expirationRenewalMap = PlatformDependent.newConcurrentHashMap();
    //内部锁过期时间
    protected long internalLockLeaseTime;
    //UUID,在Redisson初始化的时候生成
    final UUID id;
    //公共PUBLISH和SUBSCRIBE消息处理类
    protected static final LockPubSub PUBSUB = new LockPubSub();
    //命令执行代理
    final CommandAsyncExecutor commandExecutor;

    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.id = commandExecutor.getConnectionManager().getId();
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    }

    //对于同一个Lock的控制Entry
    protected String getEntryName() {
        return id + ":" + getName();
    }

    //PUBLISH和SUBSCRIBE的Redis channel名称
    String getChannelName() {
        return prefixName("redisson_lock__channel", getName());
    }

    //本次锁名称,就是上锁时HASH里面的KEY
    protected String getLockName(long threadId) {
        return id + ":" + threadId;
    }
}

在redis中的组成包括:
image
- 一个HASH(HASH的名字就是获取的锁的命名域,就是redisson.getLock("testLock")的参数testLock),HASH里面有一个键值对,KEY为当前上锁的唯一标识(由Redisson初始化的时候生成的全局唯一ID+当前线程ID组成,就是getLockName方法的返回)。VALUE就是上锁次数(因为可重入)。
- 一个订阅发布通道,名字是getChannelName方法的返回,这里就是就是redisson_lock__channel:{testLock}

上锁核心逻辑:
image

@Override
public void lock() {
    try {
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

@Override
public void lock(long leaseTime, TimeUnit unit) {
    try {
        lockInterruptibly(leaseTime, unit);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}


@Override
public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);
}

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    //尝试获取锁,如果成功就会返回null
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    //如果ttl为null,代表获取成功,返回
    if (ttl == null) {
        return;
    }

    //如果获取失败,则需要订阅到对应这个锁的channel来及时被唤醒重新尝试抢锁
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    //subscribe方法返回的是异步结果Future,通过commandExecutor来同步
    commandExecutor.syncSubscription(future);

    try {
        while (true) {
            //再次尝试获取
            ttl = tryAcquire(leaseTime, unit, threadId);
            //如果ttl为null,代表获取成功,停止循环
            if (ttl == null) {
                break;
            }

            if (ttl >= 0) {
                //如果ttl大于零,则等待ttl么多时间之后继续尝试获取(通过死循环继续尝试)
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                //否则,证明一直不过期,等待释放
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        //无论怎样,都取消对于这个channel的订阅
        unsubscribe(future, threadId);
    }
}

尝试获取锁tryAcquireAsync的核心逻辑:
Redisson对于永久锁(就是不带过期时间的锁)处理比较特殊,并不是真的永久。而是先设置一个内部锁过期时间internalLockLeaseTime,之后每过三分之内部锁过期时间之后刷新这个锁的过期时间为internalLockLeaseTime。
image

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    //如果过期时间不为永远不过期,就按照普通的方式获取锁
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    //否则,在获取锁之后,需要加上定时任务,给锁设置一个内部过期时间,并不断刷新这个时间直到释放锁
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // ttlRemaining不为null的话代表还持有这个锁
            if (ttlRemaining == null) {
                //定时刷新该锁的内部锁更新时间
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

执行lua脚本tryLockInnerAsync获取锁的逻辑:
image

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              //如果不存在,证明可以上锁
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              //如果已存在,判断当前获取锁的是不是本线程
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              //已存在并且获取锁的不是自己,就返回锁过期时间
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

1.2 可重入锁解锁源码

解锁逻辑比较简单
image

public void unlock() {
    //解锁
    Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
    //返回null代表不是当前线程获取锁,需要报警
    if (opStatus == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                + id + " thread-id: " + Thread.currentThread().getId());
    }
    //如果返回为true,证明已经释放锁,可以取消非过期锁刷新过期时间的定时任务了
    if (opStatus) {
        cancelExpirationRenewal();
    }
}

image

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //如果锁已经不存在,直接发布锁已释放的消息
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            //如果锁存在但是不是当前线程占用了这个锁,返回null
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            //释放一次锁,倘若剩余次数大于0,刷新锁过期时间
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            //否则,代表锁已经被释放,删除这个key并发布锁被释放的消息
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}