ZooKeeper的排他锁、共享锁

2,349 阅读6分钟

前言

最近一直在看《从Paxos到ZooKeeper》这本书,个人认为从分布式理论到ZooKeeper思想及实践,这本书讲的都很通俗易懂。对本菜鸟学习ZooKeeper帮助很大,本文将结合这本书的内容,根据分布式排他锁、分布式共享锁的思想,实现简单demo(curator已经实现了各种分布式锁,本文只是粗略实现思想)。

基本概念

  • 分布式锁是控制分布式系统之间同步访问共享资源的一种方式。
  • 排他锁又称为写锁或独占锁,如果事务T1对数据对象O1加上了排他锁,那么整个加锁期间,只允许事务T1对O1进行读取和更新操作,其他任何事务都不能对O1进行任何操作 - 直到T1释放了排他锁。
  • 共享锁又称为读锁,如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对O1加共享锁 - 直到O1上的共享锁都被释放。
  • 排他锁与共享锁的区别在于,加上排他锁后,数据对象只对一个事务可见,而加上共享锁后,数据对所有事务都可见。
  • ZooKeeper的锁可以理解为一个Znode临时节点,按需要可以为临时顺序节点(EPHEMERAL_SEQUENTIAL)。
  • Curator是一个开源的ZooKeeper客户端,解决了很多ZooKeeper客户端非常底层的细节开发工作,Guava is to Java what Curator is to ZooKeeper

注:本文的源码实现基于Spring Boot集成Curator实现,集成请参考提供博文

排他锁

  • 锁被定义为一个临时节点,例如/exclusive_lock/lock(可以为任何ZooKeeper路径)。
  • 获取锁的过程为所有客户端都试图通过调用create()接口,创建/exclusive_lock/lock节点,ZooKeeper会保证只有一个客户端创建成功,那么就可以认为该客户端获取了锁。
  • 释放锁:已经提到,/exclusive_lock/lock是一个临时节点,因此在以下情况下,都有可能释放锁。

当前获取锁的客户端机器发生宕机,临时节点被删除。 正常执行完业务逻辑后,客户端主动将创建的临时节点删除。

  • 通知:由于通过watch机制watch了/exclusive节点,一旦/exclusive的子节点发生变化,都会通知到客户端,客户端接到通知后,就会重复“获取锁”的流程。

@Component
public class ExclusiveLockByCurator implements InitializingBean, ILockByCurator {

    private static final Logger LOGGER = LoggerFactory.getLogger(ExclusiveLockByCurator.class);

    private static final String ROOT_LOCK_PATH = "exclusive_lock";

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    private final CuratorFramework curatorFramework;

    public ExclusiveLockByCurator(CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework.usingNamespace("lock-namespace");
    }

    @Override
    public void acquireDistributedLock(String path) {
        String keyPath = "/" + ROOT_LOCK_PATH + "/" + path;
        while (true) {
            try {
                curatorFramework.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(keyPath);
                LOGGER.info("[ExclusiveLockByCurator acquireDistributedLock] success to acquire lock for path: {}", keyPath);
                break;
            } catch (Exception e) {
                LOGGER.info("[ExclusiveLockByCurator acquireDistributedLock] failed to acquire lock for path: {}", keyPath);
                LOGGER.info("try again... ...");
                try {
                    if (countDownLatch.getCount() <= 0) {
                        countDownLatch = new CountDownLatch(1);
                    }
                    countDownLatch.await();
                } catch (Exception e1) {
                    LOGGER.error("[ExclusiveLockByCurator acquireDistributedLock] error", e1);
                }
            }
        }
    }

    @Override
    public boolean releaseDistributedLock(String path) {
        try {
            String keyPath = "/" + ROOT_LOCK_PATH + "/" + path;
            if (curatorFramework.checkExists().forPath(keyPath) != null) {
                curatorFramework.delete().forPath(keyPath);
            }
        } catch (Exception e) {
            LOGGER.error("[ExclusiveLockByCurator acquireDistributedLock] failed to release lock");
            return false;
        }
        return true;
    }

    private void addWatcher(String path) throws Exception {
        String keyPath;
        if (path.equals(ROOT_LOCK_PATH)) {
            keyPath = "/" + path;
        } else {
            keyPath = "/" + ROOT_LOCK_PATH + "/" + path;
        }
        final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener((curatorFramework, event) -> {
            if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                String oldPath = event.getData().getPath();
                LOGGER.info("[ExclusiveLockByCurator acquireDistributedLock] success to release lock for path: {}", oldPath);
                if (oldPath.contains(path)) {
                    // 子节点被删除 通知其他客户端
                    countDownLatch.countDown();
                }
            }
        });
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        String path = "/" + ROOT_LOCK_PATH;
        try {
            if (curatorFramework.checkExists().forPath(path) == null) {
                curatorFramework.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(path);
            }
            // 初始化后即watch
            addWatcher(ROOT_LOCK_PATH);
            LOGGER.info("[ExclusiveLockByCurator acquireDistributedLock] root path add watcher success");
        } catch (Exception e) {
            LOGGER.error("[ExclusiveLockByCurator acquireDistributedLock] connect zookeeper fail", e);
        }
    }
}

共享锁

  • 锁定义:依然以一个临时节点来表示一个锁,但是不同点在于共享锁由一个格式为/shared_lock/[Hostname]-请求类型-序号(/shared_lock/192.168.0.1-R-0000000001)的临时顺序节点表示。
  • 获取锁:所有客户端都会在/shared_lock路径下,创建上述格式临时顺序节点。
  • 判断读写顺序:共享锁规定,不同的事务可以同时对同一个数据进行读取操作,而更新操作必须在当前没有任何读写操作的时候进行。
  • 因此,成功获取锁的条件分为读请求、写请求两种情况。

对于读请求:没有比自己序号小的节点 或 所有比自己序号小的子节点全是读请求 对于写请求:没有比自己序号小的节点

  • 释放锁与排他锁无区别。

对于通知,则有以下两种方式,结合“羊群效应”,会逐一说明。

watch /shared_lock

  • 就和排他锁无区别了,/shared_lock子节点的变化会通知到所有其他客户端,然后重新获取/shared_lock下的子节点列表,计算是否轮到自己进行读取或更新操作。
  • 因此,整个分布式锁的竞争过程中,大量的“Watcher”通知和“子节点列表获取”两个操作重复运行。
  • 客户端无端地接收到过多和自己并不相关的事件通知,集群规模较大时,不仅会影响性能,如果同一时间有多个节点对应的客户端完成事务或是事务中断引起节点消失,ZooKeeper服务器就会在短时间内向其余客户端发送大量的事件通知 - 这就是羊群效应

watch 优化

结合上述“羊群效应”,可以提出一个优化方案,watch操作在获取子节点列表后,根据情况watch相应节点。

读请求,只需要关心比自己序号小的最后一个写请求节点,故注册Watcher监听 写请求,只需要关心比自己序号小的最后一个节点,故注册Watcher监听

@Component
public class SharedLockByCurator implements InitializingBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(SharedLockByCurator.class);

    private static final String ROOT_LOCK_PATH = "shared_lock";

    private final CuratorFramework curatorFramework;

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    // 根据序号排序
    private static Ordering<String> ordering = Ordering.from((Comparator<String>) (o1, o2) -> {

        String[] s1 = o1.split("-");
        String[] s2 = o2.split("-");

        return StringUtils.compare(s1[1], s2[1]);
    }).nullsLast();

    public SharedLockByCurator(CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework.usingNamespace("lock-namespace");
    }

    /**
     * 创建/shared_lock节点
     *
     * @date 15:25 2019-09-19
     **/
    @Override
    public void afterPropertiesSet() {

        String path = "/" + ROOT_LOCK_PATH;
        try {
            if (curatorFramework.checkExists().forPath(path) == null) {
                curatorFramework.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(path);
            }
        } catch (Exception e) {
            LOGGER.error("[SharedLockByCurator afterPropertiesSet] connect zookeeper fail", e);
        }
    }

    public String acquireDistributedLock(String path, OpType type) {

        String keyPath = "/" + ROOT_LOCK_PATH + "/" + path + "/" + type.getType() + "-";
        String parentPath = "/" + ROOT_LOCK_PATH + "/" + path;

        String thisPath = null;
        try {
            // 创建顺序临时节点
            thisPath = curatorFramework.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                    .forPath(keyPath);
            boolean haveTheLock = false;

            while (!haveTheLock) {

                // 获取子节点列表
                List<String> childPaths = curatorFramework.getChildren()
                        .forPath(parentPath);

                List<String> sortedPaths = ordering.sortedCopy(childPaths);
                // 获取index
                int index = sortedPaths.indexOf(thisPath.split("/")[3]);

                if (index > 0) {
                    boolean flag = true;
                    // 读操作时 所有序号比该操作小的为读请求时才能获取锁
                    if (type == OpType.READ) {
                        for (int i = index - 1; i >= 0; i--) {
                            String temp = sortedPaths.get(i);
                            String tempType = temp.split("-")[0];
                            if (OpType.WRITE.getType().equals(tempType)) {
                                addWatcher(parentPath + "/" + sortedPaths.get(i));
                                flag = false;
                                break;
                            }
                        }
                        if (flag) {
                            LOGGER.info("[SharedLockByCurator acquireDistributedLock] success thread= {} path= {} index= {}", Thread.currentThread().getName(), thisPath, index);
                            haveTheLock = true;
                        }
                    } else {
                        addWatcher(parentPath + "/" + sortedPaths.get(index - 1));
                    }
                    if (countDownLatch.getCount() <= 0) {
                        countDownLatch = new CountDownLatch(1);
                    }
                    countDownLatch.await();

                } else {
                    LOGGER.info("[SharedLockByCurator acquireDistributedLock] success thread= {} path= {} index= {}", Thread.currentThread().getName(), thisPath, index);
                    // 这里,为了预防 别的客户端没有来得及watcher 这个客户端已经release了。。。。笔者还在想
                    addWatcher(thisPath);
                    haveTheLock = true;
                }


            }

        } catch (Exception e) {
            LOGGER.error("[SharedLockByCurator acquireDistributedLock] error", e);
        }

        return thisPath;
    }

    private List<String> list = Collections.synchronizedList(Lists.newArrayList());

    private void addWatcher(String path) throws Exception {
        try {
	        // 粗略地实现 仅注册一次watcher。。。
            if (curatorFramework.checkExists().forPath(path) != null && !list.contains(path)) {
                final NodeCache cache = new NodeCache(curatorFramework, path, false);
                cache.start(true);
                cache.getListenable().addListener(() -> {
                    LOGGER.info("[SharedLockByCurator nodeChanged] thread= {} path: {}", Thread.currentThread().getName(), path);
                    countDownLatch.countDown();

                });
                list.add(path);
                LOGGER.info("[SharedLockByCurator addWatcher] thread= {} success path= {} ", Thread.currentThread().getName(), path);
            }
        } catch (Exception e) {
            LOGGER.error("[SharedLockByCurator addWatcher] error! ", e);
        }
    }

    public boolean releaseDistributedLock(String keyPath) {
        try {
            if (curatorFramework.checkExists().forPath(keyPath) != null) {
                curatorFramework.delete().forPath(keyPath);
                LOGGER.info("[SharedLockByCurator releaseDistributedLock] thread= {} success to release lock for path: {}", Thread.currentThread().getName(), keyPath);
            }
        } catch (Exception e) {
            LOGGER.error("[SharedLockByCurator releaseDistributedLock] failed to release lock", e);
            return false;
        }
        return true;
    }
}

结语

说实话,共享锁自己看书实现,遇到了很多问题,有几个点还没想明白,后续会结合分析curator源码实现,优化我的第一版粗略实现。 突然,想起以前创业公司实习时,师傅对我说:“先实现一版再说,后续优化,比啥都没有强”。。。

参考文献