分布式锁实现汇总

5,096 阅读10分钟

[TOC]

分布式锁实现汇总

很多时候我们需要保证同一时间一个方法只能被同一个线程调用,在单机环境中,Java中其实提供了很多并发处理相关的API,但是这些API在分布式场景中就无能为力了。也就是说单纯的Java Api并不能提供分布式锁的能力。

针对分布式锁的实现目前有多种方案:

  • 基于数据库实现分布式锁
  • 基于缓存(redis,memcached)实现分布式锁
  • 基于Zookeeper实现分布式锁

基于数据库实现分布式锁

简单实现

直接建一张表,里面记录锁定的方法名 时间 即可。
需要加锁时,就插入一条数据,释放锁时就删除数据。

CREATE TABLE `methodLock` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
  `desc` varchar(1024) NOT NULL DEFAULT '备注信息',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

当我们想要锁住某个方法时,执行以下SQL:

insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)

因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为
操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:

delete from methodLock where method_name ='method_name'
存在的问题
  1. 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
  2. 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
  3. 这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
  4. 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
解决办法
  1. 单点问题可以用多数据库实例,同时塞N个表,N/2+1个成功就任务锁定成功
  2. 写一个定时任务,隔一段时间清除一次过期的数据。
  3. 写一个while循环,不断的重试插入,直到成功。
  4. 在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。
总结

数据库实现分布式锁的优点: 直接借助数据库,容易理解。

数据库实现分布式锁的缺点: 会有各种各样的问题,在解决问题的过程中会使整个方案变得越来越复杂。

操作数据库需要一定的开销,性能问题需要考虑。

基于缓存实现分布式锁

相比于用数据库来实现分布式锁,基于缓存实现的分布式锁的性能会更好一些。

目前有很多成熟的分布式产品,包括Redis、memcache、Tair等。

单点实现
步骤
  1. 获取锁的使用,使用setnx加锁,将值设为当前的时间戳,再使用expire设置一个过期值。
  2. 获取到锁则执行同步代码块,没获取则根据业务场景可以选择自旋、休眠、或做一个等待队列等拥有锁进程来唤醒(类似Synchronize的同步队列),在等待时使用ttl去检查是否有过期值,如果没有则使用expire设置一个。
  3. 执行完毕后,先根据value的值来判断是不是自己的锁,如果是的话则删除,不是则表明自己的锁已经过期,不需要删除。(此时出现由于过期而导致的多进程同时拥有锁的问题)
存在的问题
  1. 单点问题。如果单机redis挂掉了,那么程序会跟着出错
  2. 如果转移使用 slave节点,复制不是同步复制,会出现多个程序获取锁的情况
code
    public Object around(ProceedingJoinPoint joinPoint) {
        try {
            MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
            Method method = methodSignature.getMethod();

            DLock dLock = method.getAnnotation(DLock.class);
            if (dLock != null) {
                String lockedPrefix = buildLockedPrefix(dLock, method, joinPoint.getArgs());
                long timeOut = dLock.timeOut();
                int expireTime = dLock.expireTime();
                long value = System.currentTimeMillis();
                if (lock(lockedPrefix, timeOut, expireTime, value)) {
                    try {
                        return joinPoint.proceed();
                    } catch (Throwable throwable) {
                        throwable.printStackTrace();
                    } finally {
                        unlock(lockedPrefix, value);
                    }
                } else {
                    recheck(lockedPrefix, expireTime);
                }

            }
        } catch (Exception e) {
            logger.error("DLockAspect around error", e);
        }
        return null;
    }

    /**
     * 检查是否设置过超时
     *
     * @param lockedPrefix
     * @param expireTime
     */
    public void recheck(String lockedPrefix, int expireTime) {
        try {
            Result<Long> ttl = cacheFactory.getFactory().ttl(getLockedPrefix(lockedPrefix));
            if (ttl.isSuccess() && ttl.getValue() == -1) {
                Result<String> get = cacheFactory.getFactory().get(getLockedPrefix(lockedPrefix));
                //没有超时设置则设置超时
                if (get.isSuccess() && !StringUtils.isEmpty(get.getValue())) {
                    long oldTime = Long.parseLong(get.getValue());
                    long newTime = expireTime * 1000 - (System.currentTimeMillis() - oldTime);
                    if (newTime < 0) {
                        //已过超时时间  设默认最小超时时间
                        cacheFactory.getFactory().expire(getLockedPrefix(lockedPrefix), MIX_EXPIRE_TIME);
                    } else {
                        //未超过  设置为剩余超时时间
                        cacheFactory.getFactory().expire(getLockedPrefix(lockedPrefix), (int) newTime);
                    }
                    logger.info(lockedPrefix + "recheck:" + newTime);
                }
            }
            logger.info(String.format("执行失败lockedPrefix:%s count:%d", lockedPrefix, count++));
        } catch (Exception e) {
            logger.error("DLockAspect recheck error", e);
        }

    }
        public boolean lock(String lockedPrefix, long timeOut, int expireTime, long value) {
        long millisTime = System.currentTimeMillis();
        try {
            //在timeOut的时间范围内不断轮询锁
            while (System.currentTimeMillis() - millisTime < timeOut * 1000) {
                //锁不存在的话,设置锁并设置锁过期时间,即加锁
                Result<Long> result = cacheFactory.getFactory().setnx(getLockedPrefix(lockedPrefix), String.valueOf(value));
                if (result.isSuccess() && result.getValue() == 1) {

                    Result<Long> result1 = cacheFactory.getFactory().expire(getLockedPrefix(lockedPrefix), expireTime);
                    logger.info(lockedPrefix + "locked and expire " + result1.getValue());
                    return true;
                }
                //短暂休眠,避免可能的活锁
                Thread.sleep(100, RANDOM.nextInt(50000));
            }
        } catch (Exception e) {
            logger.error("lock error " + getLockedPrefix(lockedPrefix), e);
        }

        return false;
    }

    public void unlock(String lockedPrefix, long value) {
        try {
            Result<String> result = cacheFactory.getFactory().get(getLockedPrefix(lockedPrefix));
            String kvValue = result.getValue();
            if (!StringUtils.isEmpty(kvValue) && kvValue.equals(String.valueOf(value))) {
                cacheFactory.getFactory().del(getLockedPrefix(lockedPrefix));

            }
            logger.info(lockedPrefix + "unlock:" + kvValue + "----" + value);
        } catch (Exception e) {
            logger.error("unlock error" + getLockedPrefix(lockedPrefix), e);
        }
    }
RedLock

Redlock是Redis的作者antirez给出的集群模式的Redis分布式锁,它基于N个完全独立的Redis节点(通常情况下N可以设置成5)。

步骤
  1. 获取当前时间(毫秒数)。
  2. 按顺序依次向N个Redis节点执行获取锁的操作。这个获取操作跟前面基于单Redis节点的获取锁的过程相同,包含随机字符串my_random_value,也包含过期时间(比如PX 30000,即锁的有效时间)。为了保证在某个Redis节点不可用的时候算法能够继续运行,这个获取锁的操作还有一个超时时间(time out),它要远小于锁的有效时间(几十毫秒量级)。客户端在向某个Redis节点获取锁失败以后,应该立即尝试下一个Redis节点。这里的失败,应该包含任何类型的失败,比如该Redis节点不可用,或者该Redis节点上的锁已经被其它客户端持有(注:Redlock原文中这里只提到了Redis节点不可用的情况,但也应该包含其它的失败情况)。
  3. 计算整个获取锁的过程总共消耗了多长时间,计算方法是用当前时间减去第1步记录的时间。如果客户端从大多数Redis节点(>= N/2+1)成功获取到了锁,并且获取锁总共消耗的时间没有超过锁的有效时间(lock validity time),那么这时客户端才认为最终获取锁成功;否则,认为最终获取锁失败。
  4. 如果最终获取锁成功了,那么这个锁的有效时间应该重新计算,它等于最初的锁的有效时间减去第3步计算出来的获取锁消耗的时间。
  5. 如果最终获取锁失败了(可能由于获取到锁的Redis节点个数少于N/2+1,或者整个获取锁的过程消耗的时间超过了锁的最初有效时间),那么客户端应该立即向所有Redis节点发起释放锁的操作。
优化

客户端1成功锁住了A, B, C,获取锁成功(但D和E没有锁住);节点C崩溃重启了,但客户端1在C上加的锁没有持久化下来,丢失了;节点C重启后,客户端2锁住了C, D, E,获取锁成功。客户端1和客户端2同时获得了锁(针对同一资源)。

这个问题可以延迟节点的恢复时间,时间长度应大于等于一个锁的过期时间。

存在的问题
  1. 时钟发生跳跃。这种情况发生时,直接导致所有的锁都超时,新的线程可以成功的获取锁,导致多线程同时处理。

关于RedLock的更多内容可以看:

基于Redis的分布式锁真的安全吗?(上)

基于Redis的分布式锁真的安全吗?(下)

一个比较好的实现:

Redission

Zookeeper锁

步骤
  1. 每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。
  2. 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。
  3. 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
优点
  1. 无单点问题。ZK是集群部署的,只要集群中有半数以上的机器存活,就可以对外提供服务。

  2. 持有锁任意长的时间,可自动释放锁。使用Zookeeper可以有效的解决锁无法释放的问题,因为在创建锁的时候,客户端会在ZK中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。这避免了基于Redis的锁对于有效时间(lock validity time)到底设置多长的两难问题。实际上,基于ZooKeeper的锁是依靠Session(心跳)来维持锁的持有状态的,而Redis不支持Sesion。

  3. 可阻塞。使用Zookeeper可以实现阻塞的锁,客户端可以通过在ZK中创建顺序节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。

  4. 可重入。客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。

问题
  1. 这种做法可能引发羊群效应,从而降低锁的性能。
  2. 性能不如缓存。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。

一个比较好的实现:

Curator

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    try {
        return interProcessMutex.acquire(timeout, unit);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return true;
}
public boolean unlock() {
    try {
        interProcessMutex.release();
    } catch (Throwable e) {
        log.error(e.getMessage(), e);
    } finally {
        executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS);
    }
    return true;
}

acquire方法用户获取锁,release方法用于释放锁。

总结

使用Zookeeper实现分布式锁的优点: 有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。实现起来较为简单。

使用Zookeeper实现分布式锁的缺点 : 性能上不如使用缓存实现分布式锁。 需要对ZK的原理有所了解。

三种方案的比较

从理解的难易程度角度(从低到高): 数据库 > 缓存 > Zookeeper

从实现的复杂性角度(从低到高): Zookeeper >= 缓存 > 数据库

从性能角度(从高到低): 缓存 > Zookeeper >= 数据库

从可靠性角度(从高到低): Zookeeper > 缓存 > 数据库