在项目中,当共享资源出现竞争情况的时候,为了防止出现并发问题,我们一般会采用锁机制来控制。在单机环境下,可以使用synchronized或Lock来实现;但是在分布式系统中,因为竞争的线程可能不在同一个节点上(同一个jvm中),所以需要一个让所有进程都能访问到的锁来实现,比如redis、zookeeper
为什么zookeeper可以用作分布式锁
多个进程同一时间多个线程同时访问共享资源,但是锁只有一把,先到先得,其他迟到的只能阻塞,但是正在执行的线程的流程是竞争到锁的时候,设置锁标志,用完之后将锁标志清除。阻塞的线程只能一直轮询看看是不是到自己了,这种效率是非常低下的,所以zookeeper中的watch机制非常适合此场景,当节点发生变化时,通知到监听者。
如何实现
锁,毫无疑问只能有一个,在zookeeper中只需要保证有一个唯一的节点或者维护一个固定标识位来实现。
- 使用节点存储区域,也就是存储一个标识位,当一个线程获取锁时,先判断标志是否被占用,如果未被占用,则修改为占用状态,获取锁成功
- 使用子节点,每个线程获取锁时,都在固定节点下创建临时顺序的子节点,默认最小节点线程获得锁,当释放锁时,删除对应的子节点即可
方案对比
- 方案一的缺点是可能造成死锁,当一个线程成功获取锁后,执行过程中线程出现意外,无法释放锁,锁标识位一直存在,其他线程将一直等待
- 方案二为了弥补方案一的致命缺陷,采用了临时节点,这样如果线程出现意外,失去zk连接之后,相对应的子节点也会自动清除
package com.example.zookeeperDemo;
import org.apache.zookeeper.*;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/** * @Description: 手写简易zookeeper分布式锁
* @param:
* @return:
* @auther: songjie
* @date: 2019-08-21 20:55
*/
public class ZkLock implements Watcher {
private ZooKeeper zk = null;
private int sessionTimeout = 3000;
private final String rootLockNodeName = "/root";
private String lockName = "/lock";
private String currentLock;
private String waitLockName;
private String splitStr = "_";
private CountDownLatch countDownLatch;
public ZkLock(String zkAddress) {
try {
zk = new ZooKeeper(zkAddress, sessionTimeout, this);
if (zk.exists(rootLockNodeName, false) == null) {
zk.create(rootLockNodeName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public Boolean lock() {
if (tryLock()) {
System.out.println("线程" + Thread.currentThread().getName() + "获取锁" + currentLock + "成功");
return true;
} else {
return waitLock(this.waitLockName);
}
}
private boolean tryLock() {
try {
this.currentLock = zk.create(rootLockNodeName + lockName + splitStr, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("线程" + Thread.currentThread().getName() + "创建节点" + currentLock + ",开始竞争");
List<String> childNodes = zk.getChildren(this.rootLockNodeName, false);
if (!CollectionUtils.isEmpty(childNodes)) {
Collections.sort(childNodes);
if ((rootLockNodeName + "/" + childNodes.get(0)).equals(this.currentLock)) {
return true;
}
}
this.waitLockName = rootLockNodeName + "/" + childNodes.get(childNodes.indexOf(currentLock.split("/")[2]) -1);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
private boolean waitLock(String waitLock) {
try {
if (zk.exists(waitLock, true) != null) {
System.out.println("线程" + Thread.currentThread().getName() + "加锁失败,等待" + waitLock);
this.countDownLatch = new CountDownLatch(1);
countDownLatch.await();
System.out.println("线程" + Thread.currentThread().getName() + "加锁成功," + waitLock + "锁已释放");
return true;
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public boolean unlock() {
try {
if (zk.exists(currentLock, false) != null) {
System.out.println("释放锁" + currentLock);
zk.delete(this.currentLock, -1);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
// zk监听器回调
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent != null && watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
this.countDownLatch.countDown();
}
}}
package com.example.zookeeperDemo;
public class ZookeeperDemoApplication {
public static void main(String[] args) {
Runnable runnable = new Runnable() {
@Override
public void run() {
ZkLock zkLock = new ZkLock("127.0.0.1:2181");
if (zkLock.lock()) {
System.out.println("线程" + Thread.currentThread().getName() + "正在运行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
zkLock.unlock();
}
}
};
for (int i = 0; i < 5; i++) {
Thread t = new Thread(runnable);
t.start();
}
}}