zookeeper分布式锁

719 阅读3分钟

       在项目中,当共享资源出现竞争情况的时候,为了防止出现并发问题,我们一般会采用锁机制来控制。在单机环境下,可以使用synchronized或Lock来实现;但是在分布式系统中,因为竞争的线程可能不在同一个节点上(同一个jvm中),所以需要一个让所有进程都能访问到的锁来实现,比如redis、zookeeper

为什么zookeeper可以用作分布式锁

       多个进程同一时间多个线程同时访问共享资源,但是锁只有一把,先到先得,其他迟到的只能阻塞,但是正在执行的线程的流程是竞争到锁的时候,设置锁标志,用完之后将锁标志清除。阻塞的线程只能一直轮询看看是不是到自己了,这种效率是非常低下的,所以zookeeper中的watch机制非常适合此场景,当节点发生变化时,通知到监听者。

如何实现

       锁,毫无疑问只能有一个,在zookeeper中只需要保证有一个唯一的节点或者维护一个固定标识位来实现。

  • 使用节点存储区域,也就是存储一个标识位,当一个线程获取锁时,先判断标志是否被占用,如果未被占用,则修改为占用状态,获取锁成功
  • 使用子节点,每个线程获取锁时,都在固定节点下创建临时顺序的子节点,默认最小节点线程获得锁,当释放锁时,删除对应的子节点即可

方案对比

  • 方案一的缺点是可能造成死锁,当一个线程成功获取锁后,执行过程中线程出现意外,无法释放锁,锁标识位一直存在,其他线程将一直等待
  • 方案二为了弥补方案一的致命缺陷,采用了临时节点,这样如果线程出现意外,失去zk连接之后,相对应的子节点也会自动清除
package com.example.zookeeperDemo;
import org.apache.zookeeper.*;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/** * @Description: 手写简易zookeeper分布式锁 
    * @param:  
    * @return:  
    * @auther: songjie 
    * @date: 2019-08-21 20:55 
*/
public class ZkLock implements Watcher {    
    private ZooKeeper zk = null;    
    private int sessionTimeout = 3000;    
    private final String rootLockNodeName = "/root";    
    private String lockName = "/lock";    
    private String currentLock;    
    private String waitLockName;    
    private String splitStr = "_";    
    private CountDownLatch countDownLatch;    
    public ZkLock(String zkAddress) {        
        try {            
            zk = new ZooKeeper(zkAddress, sessionTimeout, this);            
            if (zk.exists(rootLockNodeName, false) == null) {                
                zk.create(rootLockNodeName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);            
            }        
        } catch (IOException e) {            
            e.printStackTrace();        
        } catch (InterruptedException e) {           
            e.printStackTrace();        
        } catch (KeeperException e) {            
            e.printStackTrace();        
        }    
    }    

    public Boolean lock() {        
        if (tryLock()) {            
            System.out.println("线程" + Thread.currentThread().getName() + "获取锁" + currentLock + "成功");            
            return true;        
        } else {            
            return waitLock(this.waitLockName);        
        }    
    }
    
    private boolean tryLock() {        
        try {            
            this.currentLock = zk.create(rootLockNodeName + lockName + splitStr, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);            
            System.out.println("线程" + Thread.currentThread().getName() + "创建节点" + currentLock + ",开始竞争");            
            List<String> childNodes = zk.getChildren(this.rootLockNodeName, false);            
            if (!CollectionUtils.isEmpty(childNodes)) {                
                Collections.sort(childNodes);                
                if ((rootLockNodeName + "/" + childNodes.get(0)).equals(this.currentLock)) {                    
                    return true;                
                }            
            }            
            this.waitLockName = rootLockNodeName + "/" + childNodes.get(childNodes.indexOf(currentLock.split("/")[2]) -1);        
        } catch (KeeperException e) {            
            e.printStackTrace();        
        } catch (InterruptedException e) {            
            e.printStackTrace();        
        }        
        return false;    
    }
    
    private boolean waitLock(String waitLock) {        
        try {            
            if (zk.exists(waitLock, true) != null) {                
                System.out.println("线程" + Thread.currentThread().getName() + "加锁失败,等待" + waitLock);                
                this.countDownLatch = new CountDownLatch(1);                
                countDownLatch.await();                
                System.out.println("线程" + Thread.currentThread().getName() + "加锁成功," + waitLock + "锁已释放");                
                return true;            
            }        
        } catch (KeeperException e) {            
            e.printStackTrace();        
        } catch (InterruptedException e) {            
            e.printStackTrace();        
        }        
            return false;    
     }    

    public boolean unlock() {        
        try {            
            if (zk.exists(currentLock, false) != null) {                
                System.out.println("释放锁" + currentLock);                
                zk.delete(this.currentLock, -1);            
            }        
        } catch (KeeperException e) {            
            e.printStackTrace();        
        } catch (InterruptedException e) {            
            e.printStackTrace();        
        }        
        return true;    
    }    

    // zk监听器回调    
    @Override    
    public void process(WatchedEvent watchedEvent) {        
        if (watchedEvent != null && watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {            
            this.countDownLatch.countDown();        
        }    
    }}
package com.example.zookeeperDemo;
public class ZookeeperDemoApplication {   
    public static void main(String[] args) {      
        Runnable runnable = new Runnable() {         
            @Override         
            public void run() {            
                ZkLock zkLock = new ZkLock("127.0.0.1:2181");            
                if (zkLock.lock()) {               
                    System.out.println("线程" + Thread.currentThread().getName() + "正在运行");               
                    try {                  
                        Thread.sleep(1000);               
                    } catch (InterruptedException e) {                  
                        e.printStackTrace();               
                    }               
                    zkLock.unlock();            
                }         
            }      
        };      

        for (int i = 0; i < 5; i++) {         
            Thread t = new Thread(runnable);         
            t.start();      
        }   
}}