阅读 2853

从零开始的高并发(二)--- Zookeeper实现分布式锁

前言

上一篇 从0开始的高并发(一)--- zookeeper的基础概念 我们在结尾留下了一个分布式锁的坑,它保证了我们在多节点应用的一次调度还有解决分布式环境下的数据一致性的问题

比如我们现在拥有这么一个集群,集群里面有个缓存服务,集群中每个程序都会用到这个缓存,如果此时缓存中有一项缓存过期了,在大并发环境下,同一时刻中许许多多的服务都过来访问缓存,获取缓存中的数据,发现缓存过期,就要再去数据库取,然后更新到缓存服务中去。但是其实我们仅仅只需要一个请求过来数据库去更新缓存即可,然后这个场景,我们该怎么去做

我们参考多线程的场景下会使用到锁的这个方法,放到现在的并发场景下,我们也是需要通过一种锁来实现。


使用Zookeeper来进行开发

1.锁的特点与原生zookeeper

① 普通锁具备什么特点?

排他(互斥)性:只有一个线程能获取到
    文件系统(同一个文件不支持多个人去修改)
    数据库:主键唯一约束 for update
    缓存:redis setnx命令
    zookeeper:类似文件系统
阻塞性:其他未抢到的线程阻塞,直到锁被释放再进行抢这个行为
可重入性:线程获取锁后,后续是否可重复获得该锁
复制代码

② 为什么zookeeper可以用来实现锁

同一个父目录下面不能有相同的子节点,这就是zookeeper的排他性
通过JDK的栅栏来实现阻塞性
可重入性我们可以通过计数器来实现
复制代码

③ 原生的zookeeper存在着什么问题

1.接口难以使用
2.连接zookeeper超时不支持自动重连
3.watch注册一次会失效,需要反复注册
4.不支持递归创建节点(递归创建的话,比方说我要创建一个文件,假如我在idea创建,那我可以连带着包一起创建,但是在window我就做不到,这种整一个路径一并创建下来的就可以视为递归创建)
5.需要手动设置序列化的问题
复制代码

④ 创建客户端的核心类:Zookeeper

org.apache.zookeeper
org.apache.zookeeper.data

connect---连接到zookeeper集合
create---创建znode
exist---检查znode是否存在及其信息
getData---从特定的znode获取数据
setData---从特定的znode设置数据
getChildren---获取特定znode中的所有子节点
delete===删除特定znode及其所有子项
close---关闭连接
复制代码

2.使用第三方客户端zkClient来简化操作

① 实现序列化接口 ZkSerializer

MyZkSerializer.java

public class MyZkSerializer implements ZkSerializer {

//正常来说我们还需要进行一个非空判断,这里为了省事没做,不过严格来说是需要做的
//就是简单的转换
    @Override
    public byte[] serialize(Object data) throws ZkMarshallingError {
        String d = (String) data;
        try {
            return d.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        try {
            return new String(bytes, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }

}
复制代码

② zkclient的简单使用

ZkClientDemo.java

public class ZkClientDemo {
    public static void main(String[] args) {
        // 创建一个zk客户端
        ZkClient client = new ZkClient("localhost:2181");
        
        //实现序列化接口
        client.setZkSerializer(new MyZkSerializer());
        
        //创建一个节点zk,在zk节点下再创建一个子节点app6,赋值123
        //在之前也已经提到了,zookeeper中的节点既是文件夹也是文件
        
        //源码中CreateMode是一个枚举,CreateMode.PERSISTENT---当客户端断开连接时,znode不会自动删除
        client.create("/zk/app6", "123", CreateMode.PERSISTENT);

        client.subscribeChildChanges("/zk/app6", new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println(parentPath+"子节点发生变化:"+currentChilds);

            }
        });

        //这里开始是创建一个watch,但是为什么这个方法会命名为subscribeDataChanges()呢,原因是:
        //原本watch的设置然后获取是仅一次性的,现在我们使用subscribe这个英文,代表订阅,代表这个watch一直存在
        //使用这个方法我们可以轻易实现持续监听的效果,比原生zookeeper方便
        
        client.subscribeDataChanges("/zk/app6", new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println(dataPath+"节点被删除");
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println(dataPath+"发生变化:"+data);
            }
        });

        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
复制代码

运行结果

调用ls /zk---可以发现app6已经被创建,

通过get /zk/app6---可获取到我们设置的123这个值

说明我们的程序没有问题,可以成功执行

这里测试监听事件

create /zk/app6/tellYourDream时---控制台打印/zk/app6子节点发生变化:[tellYourDream]

delete /zk/app6/tellYourDream---控制台打印/zk/app6子节点发生变化:[],此时已经不存在任何节点,所以为空

set /zk/app6 123456---/zk/app6发生变化:123456

delete /zk/app6---同时触发了两个监听事件,/zk/app6子节点发生变化:null 和 /zk/app6节点被删除

③ CreateMode 的补充

1.持久化节点:不删除节点永远存在。且可以创建子节点

/**
 * The znode will not be automatically deleted upon client's disconnect.
 * 持久无序
 */
PERSISTENT (0, false, false),
/**
* The znode will not be automatically deleted upon client's disconnect,
* and its name will be appended with a monotonically increasing number.
* 持久有序
*/
PERSISTENT_SEQUENTIAL (2, false, true),
复制代码

2.非持久节点,换言之就是临时节点,临时节点就是客户端连接的时候创建,客户端挂起的时候,临时节点自动删除。不能创建子节点

/**
 * The znode will be deleted upon the client's disconnect.
 * 临时无序
 */
EPHEMERAL (1, true, false),
/**
 * The znode will be deleted upon the client's disconnect, and its name
 * will be appended with a monotonically increasing number.
 * 临时有序
 */
EPHEMERAL_SEQUENTIAL (3, true, true);
复制代码

还有更多的一些监听方法,我们可以自己去尝试一下。

3.Zookeeper实现分布式锁

① zookeeper实现分布式锁方式一

我们之前有提到,zookeeper中同一个子节点下面的节点名称是不能相同的,我们可以利用这个互斥性,就可以实现分布式锁的工具

临时节点就是创建的时候存在,消失的时候,节点自动删除,当客户端失联,网络不稳定或者崩溃的时候,这个通过临时节点所创建的锁就会自行消除。这样就可以完美避免死锁的问题。所以我们利用这个特性,实现我们的需求。

原理其实就是节点不可重名+watch机制。

比如说我们的程序有多个服务实例,哪个服务实例都去创建一个lock节点,谁创建了,谁就获得了锁,剩下我们没有创建的应用,就去监听这个lock节点,如果这个lock节点被删除掉,这时可能出现两种情况,一就是客户端连不上了,另一种就是客户端释放锁,将lock节点给删除掉了。

ZkDistributeLock.java(注意,不需要重写的方法已经删除)
public class ZkDistributeLock implements Lock {

    //我们需要一个锁的目录
    private String lockPath;

    //我们需要一个客户端
    private ZkClient client;


    //刚刚我们的客户端和锁的目录,这两个参数怎么传进来?
    //那就需要我们的构造函数来进行传值

    public ZkDistributeLock(String lockPath) {
        if(lockPath ==null || lockPath.trim().equals("")) {
            throw new IllegalArgumentException("patch不能为空字符串");
        }
        this.lockPath = lockPath;

        client = new ZkClient("localhost:2181");
        client.setZkSerializer(new MyZkSerializer());
    }
复制代码

实现Lock接口要重写的方法(包括尝试创建临时节点tryLock(),解锁unlock(),上锁lock(),waitForLock()实现阻塞和唤醒的功能方法)

    // trylock方法我们是会尝试创建一个临时节点
    @Override
    public boolean tryLock() { // 不会阻塞
        // 创建节点
        try {
            client.createEphemeral(lockPath);
        } catch (ZkNodeExistsException e) {
            return false;
        }
        return true;
    }

    @Override
    public void unlock() {
        client.delete(lockPath);
    }


    @Override
    public void lock() {

        // 如果获取不到锁,阻塞等待
        if (!tryLock()) {

            // 没获得锁,阻塞自己
            waitForLock();

            // 从等待中唤醒,再次尝试获得锁
            lock();
        }

    }

    private void waitForLock() {
        final CountDownLatch cdl = new CountDownLatch(1);

        IZkDataListener listener = new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("----收到节点被删除了-------------");
                //唤醒阻塞线程
                cdl.countDown();
            }

            @Override
            public void handleDataChange(String dataPath, Object data)
                    throws Exception {
            }
        };

        client.subscribeDataChanges(lockPath, listener);

        // 阻塞自己
        if (this.client.exists(lockPath)) {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 取消注册
        client.unsubscribeDataChanges(lockPath, listener);
    }
}
复制代码

ZkDistributeLock 现在我们再总结一下流程

获取锁,创建节点后

    1.成功获取到的---执行业务---然后释放锁
                                    |
                                    |
                                    |               
    2.获取失败,注册节点的watch---阻塞等待---取消watch---再回到获取锁,创建节点的判断
复制代码

这个设计会有一个缺点,比如我的实例现在有无数个,此时我们的lock每次被创建,有人获取了锁之后,其他的人都要被通知阻塞,此时我们就浪费了很多的网络资源,也就是惊群效应。

此时我们必须进行优化

② zookeeper实现分布式锁方式二

我们的Lock作为一个znode,也可以创建属于它的子节点,我们使用lock创建临时顺序节点,我们在从0开始的高并发(一)--- zookeeper的基础概念中已经提到了,zookeeper是有序的,临时顺序节点会自动进行由小到大的自动排序,此时我们把实例分配至这些顺序子节点上,然后编号最小的获取锁即可。这非常类似于我们的公平锁的概念,也是遵循FIFO原则的

原理:取号 + 最小号取lock + watch

同样是基于Lock接口的实现

ZkDistributeImproveLock.java(注意,不需要重写的方法已经删除)
public class ZkDistributeImproveLock implements Lock {

    /*
     * 利用临时顺序节点来实现分布式锁
     * 获取锁:取排队号(创建自己的临时顺序节点),然后判断自己是否是最小号,如是,则获得锁;不是,则注册前一节点的watcher,阻塞等待
     * 释放锁:删除自己创建的临时顺序节点
     */
     
     //同样的锁目录
    private String lockPath;

    //同样的客户端
    private ZkClient client;

    private ThreadLocal<String> currentPath = new ThreadLocal<String>();

    private ThreadLocal<String> beforePath = new ThreadLocal<String>();
    // 锁重入计数器
    private ThreadLocal<Integer> reenterCount = ThreadLocal.withInitial(()->0);

    public ZkDistributeImproveLock(String lockPath) {
        if(lockPath == null || lockPath.trim().equals("")) {
            throw new IllegalArgumentException("patch不能为空字符串");
        }
        this.lockPath = lockPath;
        client = new ZkClient("localhost:2181");
        client.setZkSerializer(new MyZkSerializer());
        if (!this.client.exists(lockPath)) {
            try {
                this.client.createPersistent(lockPath, true);
            } catch (ZkNodeExistsException e) {

            }
        }
    }

    @Override
    public boolean tryLock() {
        System.out.println(Thread.currentThread().getName() + "-----尝试获取分布式锁");
        if (this.currentPath.get() == null || !client.exists(this.currentPath.get())) {
        
            //这里就是先去创建了一个临时顺序节点,在lockpath那里创建
            //用银行取号来表示这个行为吧,相当于每个实例程序先去取号,然后排队等着叫号的场景
            String node = this.client.createEphemeralSequential(lockPath + "/", "locked");
            //记录第一个节点编号
            currentPath.set(node);
            reenterCount.set(0);
        }

        // 获得所有的号
        List<String> children = this.client.getChildren(lockPath);

        // 把这些号进行排序
        Collections.sort(children);

        // 判断当前节点是否是最小的,和第一个节点编号做对比
        if (currentPath.get().equals(lockPath + "/" + children.get(0))) {
            // 锁重入计数
            reenterCount.set(reenterCount.get() + 1);
            System.out.println(Thread.currentThread().getName() + "-----获得分布式锁");
            return true;
        } else {
            // 取到前一个
            // 得到字节的索引号
            int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
            String node = lockPath + "/" + children.get(curIndex - 1);
            beforePath.set(node);
        }
        return false;
    }

    @Override
    public void lock() {
        if (!tryLock()) {
            // 阻塞等待
            waitForLock();
            // 再次尝试加锁
            lock();
        }
    }

    private void waitForLock() {

        final CountDownLatch cdl = new CountDownLatch(1);

        // 注册watcher
        IZkDataListener listener = new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println(Thread.currentThread().getName() + "-----监听到节点被删除,分布式锁被释放");
                cdl.countDown();
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };

        client.subscribeDataChanges(this.beforePath.get(), listener);

        // 怎么让自己阻塞
        if (this.client.exists(this.beforePath.get())) {
            try {
                System.out.println(Thread.currentThread().getName() + "-----分布式锁没抢到,进入阻塞状态");
                cdl.await();
                System.out.println(Thread.currentThread().getName() + "-----释放分布式锁,被唤醒");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 醒来后,取消watcher
        client.unsubscribeDataChanges(this.beforePath.get(), listener);
    }

    @Override
    public void unlock() {
        System.out.println(Thread.currentThread().getName() + "-----释放分布式锁");
        if(reenterCount.get() > 1) {
            // 重入次数减1,释放锁
            reenterCount.set(reenterCount.get() - 1);
            return;
        }
        // 删除节点
        if(this.currentPath.get() != null) {
            this.client.delete(this.currentPath.get());
            this.currentPath.set(null);
            this.reenterCount.set(0);
        }
    }
复制代码

ps:不用担心内存占满的问题,JVM会进行垃圾回收

4.更为简单的第三方客户端---Curator

这里对于curator就不做展开了,有兴趣可以自己去玩下

地址:curator.apache.org/curator-exa…

对于选举leader,锁locking,增删改查的framework等都有实现

finally

距离上一篇的更新似乎隔了好一段1时间,也是因为上周比较忙抽不出空子来,之后还是会进行周更(尽力)

下一篇:从零开始的高并发(三)--- Zookeeper集群的leader选举

关注下面的标签,发现更多相似文章
评论