阅读 666

从零开始的高并发(五)--- Zookeeper的配置中心应用

前言

前情概要

上一篇的内容是补充了ZAB协议和分布式队列的一种实现,ZAB我们谈到了它的一个协议流程和在和follower失联时的崩溃恢复,还有如何进行数据同步和丢弃事务。分布式队列的具体代码实现中的结构,还有类中定义的每个方法基本也都有提及了,相信大家也一定能够自己动手完成代码的补充并成功运行。分布式队列的代码逻辑如下图,注意使用的虚实线和线的颜色都指代了不同的行为。

以往链接

从零开始的高并发(一)--- Zookeeper的基础概念

从零开始的高并发(二)--- Zookeeper实现分布式锁

从零开始的高并发(三)--- Zookeeper集群的搭建和leader选举

从零开始的高并发(四)--- Zookeeper的经典应用场景

zookeeper在配置中心的应用

分布式环境下,我们的服务很多,配置确是共用一套的。处理起来会十分麻烦,配置中心可以帮助我们解决系统参数配置及参数的动态修改问题

1.何为配置中心

运维管理人员把配置修改之后进行提交,把配置推送到配置中心,我们分布式应用下的每个应用实例可以通过对watch事件的监听来获取配置中心文件的变更,在不重启服务的情况下也能做到把应用中的一些属性从内存中替换掉

2.zookeeper实现配置中心

zookeeper配置中心的数据结构

假设我们现在拥有这么一台zookeeper服务器,我们需要创建一个配置中心的根目录distributeConfigure,注意图中的server是指我们集群中的某项服务,server1-file1.cnf是指这个服务下的配置文件1,2,3···,server.port也属于其中一个配置,从这一层开始对应zookeeper下的一个个节点,也就是说,我们把每一项配置,比如刚刚的服务端口server.port,都看作是一个znode,然后一个一个往zookeeper中存过去即可。

此时我们不关心服务是否挂掉或者怎样,也不关心节点的顺序(除非是服务功能中有特殊要求),还有就是,我们的配置一般也会有该配置的名字,但是对于顺序一般也是不要求的,所以我们就会选用持久节点来记录配置。

此时应用服务要做的事情就很简单了,就是对这些个节点进行监控,只要节点存在变化,应用服务就把节点下的数据取过来即可。

配置中心的另一种思路

前面提到的都是一个配置项对应一个znode,那我们其实也可以换一种想法,比如我很多个配置项都放在了同一个文件下,那我就换成,一个文件对应一个znode,把文件下的内容都放置在znode的value里面

3.zookeeper实现配置中心的代码实现

① 角色1:运维人员

刚刚我们也提到过了,运维管理人员把配置修改之后进行提交推送给配置中心,那我们现在就得实现一个运维人员需要用到的接口出来,能够对配置文件进行读写操作。

public interface ConfigureWriter {
/**
 * 创建一个新的配置文件
 * @param fileName 文件名称
 * @param items 配置项
 * @return 新文件的在zk上的路径
 */
String createCnfFile(String fileName, Properties items);
/**
 * 删除一个配置文件
 * @param fileName
 */
void deleteCnfFile(String fileName);
/**
 * 修改一个配置文件
 * @param fileName
 * @param items
 */
void modifyCnfItem(String fileName, Properties items);
/**
 * 加载配置文件
 * @param fileName
 * @return
 */
Properties loadCnfFile(String fileName);
复制代码

}

② 应用服务器上所使用的接口

/**
 * 配置文件读取器
 * ConfigureReader
 */
public interface ConfigureReader {
	/**
	 * 读取配置文件
	 * @param fileName 配置文件名称
	 * @param ChangeHandler 配置发生变化的处理器
	 * @return 如果存在文件配置,则返回Properties对象,不存在返回null
	 */
	Properties loadCnfFile(String fileName);
	/**
	 * 监听配置文件变化,此操作只需要调用一次。
	 * @param fileName
	 * @param changeHandler
	 */
	void watchCnfFile(String fileName, ChangeHandler changeHandler);
	
	/**
	 * 配置文件变化处理器
	 * ChangeHandler
	 */
	interface ChangeHandler {
		/**
		 * 配置文件发生变化后给一个完整的属性对象
		 * @param newProp
		 */
		void itemChange(Properties newProp);
	}
}
复制代码

③ 测试代码

刚刚也提到了,运维人员需要使用到ConfigureWriter这个接口进行配置文件的读写操作,中途为了确保zookeeper上不存在这个节点,先执行了一次writer.deleteCnfFile(fileName),使用了一个线程去读配置文件

public class ConfigureTest {

	public static void main(String[] args) {
		// 模拟运维人员创建配置文件,引用ConfigureWriter接口操作
		ConfigureWriter writer = new ZkConfigureCenter();
		String fileName = "trade-application.properties";
		writer.deleteCnfFile(fileName);	// 测试,确保配置中心没有这个问题
		
		Properties items = new Properties();
		items.put("abc.gc.a", "123");
		items.put("abc.gc.b", "3456");
		// 创建配置文件,内容为 properties items的内容。
		String znodePath = writer.createCnfFile(fileName, items);
		System.out.println("new file: "+znodePath);
		
		
		new Thread(()->{
			readCnf();
		}).start();
		
		try {
			Thread.sleep(3000L);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		// 3秒后修改文件内容,有新增、有删除、有修改
		items.put("abc.gc.a", "haha");	// 修改
		items.put("abc.gc.c", "xx");	// 新增
		items.remove("abc.gc.b"); // 删除
		writer.modifyCnfItem(fileName, items);
		
		try {
			Thread.currentThread().join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 模拟应用程序加载配置文件,监听配置文件的变化
	 */
	public static void readCnf() {
		// 应用引用ConfigureReader接口进行操作
		System.out.println("读取并监听配置文件");
		ConfigureReader reader = new ZkConfigureCenter();
		String fileName = "trade-application.properties";
		Properties p = reader.loadCnfFile(fileName);		// 读取配置文件
		System.out.println(p);
		
		// 监听配置文件
		reader.watchCnfFile(fileName, new ChangeHandler() {
			@Override
			public void itemChange(Properties newProp) {
				System.out.println("发现数据发生变化:"+ newProp);
			}
		});
		
		try {
			Thread.currentThread().join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
复制代码

运行效果

其实就是把我们的每一个配置项都放入zookeeper上

首先,我们建立了一个这样的配置文件 /distributeConfigure/cnfFile/trade-application.properties,该文件下的内容是

客户端这时也读取到了这个配置文件的相关信息

然后我们对这个配置文件进行了一些修改,在测试代码的进程处已经写得非常清楚做了哪些修改了,新增删除修改3个操作都进行了一次

此时如果断开连接,就会出现报错

我们可以打开zkClient通过命令来检查一下程序的可靠性,通过ls /path命令查看一下这些节点都是否正常之后,我们来手动修改一下节点数据

此时事情还没完,假设我们一次性对几百甚至上千个配置进行了修改,那岂不是一下子会弹出几百上千条通知吗,所以我们还要考虑一下请求合并,还有就是,我们可能也不止一个运维人员,也有可能同时有好几个人对同一个配置文件进行了修改,所以我们也可以考虑用锁来锁定这个配置文件,只让一个人来进行修改,不过我们要注意,因为zookeeper中对写事务的提交是有原子性的,写操作都会按顺序来进行,不过我们就会进行模拟,只允许一个人修改的情况。

了解以上问题之后,我们来说说上面代码中未提及的配置中心的实现

4.补充配置中心的具体实现

① 定义一个实现了读写接口的ZkConfigureCenter,ConfigureWriter是让管理员,也就是运维等人用的,ConfigureReader是让程序使用的

public class ZkConfigureCenter implements ConfigureWriter, ConfigureReader {}
复制代码

② 成员属性的定义

zkClient已经是连着好几篇文的老油条了,confRootPath是根目录,confFilePath是配置文件目录,fileLockPath是模拟锁的目录,因为我们会针对一个文件使用一把锁,那就肯定不止使用一把,所以我们就建立一个目录来存放这些锁。每当发起一次写操作,那就增加一个节点当文件锁。

private String confRootPath;
private String confFilePath;
private String fileLockPath;
private static final String default_confRootPath = "/distributeConfigure";
private ZkClient client;
复制代码

③ 构造器及简单的检查方法

public ZkConfigureCenter() {
		this(default_confRootPath);
	}
	
public ZkConfigureCenter(String path) {
	if(path == null || path.trim().equals("")) {
		throw new IllegalArgumentException("patch不能为空字符串");
	}
	confRootPath = path;
	confFilePath = confRootPath+"/cnfFile";
	fileLockPath = confRootPath+"/writeLock";
	client = new ZkClient("localhost:2181");
	client.setZkSerializer(new MyZkSerializer());
	if (!this.client.exists(confFilePath)) {
		try {
			this.client.createPersistent(confFilePath, true);
		} catch (ZkNodeExistsException e) {
			
		}
	}
}

//简单的参数检查
private void checkElement(String v) {
    if (v == null) throw new NullPointerException();
    if("".equals(v.trim())) {
    	throw new IllegalArgumentException("不能使用空格");
    }
    if(v.startsWith(" ") || v.endsWith(" ")) {
    	throw new IllegalArgumentException("前后不能包含空格");
    }
}
复制代码

④ 创建配置文件

首先这配置文件总得有个fileName吧,items就是代表这个配置文件下的各项属性,具体内容请看注释,里面使用到了我们在 从零开始的高并发(二)--- Zookeeper实现分布式锁 中的ZkDistributeImproveLock.java,代码位置在"使用zookeeper来进行开发"的 3 - ② zookeeper实现分布式锁方式二,如果想要跑一下前面的测试代码的话,建议去ctrl+c/+v一下即可,记得要把我删掉的不需要覆写的方法补全

@Override
public String createCnfFile(String fileName, Properties items) {
	checkElement(fileName);
	// 创建配置文件Node
	String cfgNode = confFilePath+"/"+fileName;
	
	//如果配置文件已经存在,总不能把别人的给覆写掉吧
	if(client.exists(cfgNode)) {
		throw new IllegalArgumentException("["+fileName+"]文件已存在!"); 
	}
	
	//没问题了,创建持久节点
	client.createPersistent(cfgNode, true);
	// 创建配置文件中的配置项
	if(items == null) {return cfgNode;}
	
	//这里我们创建了带上这个配置文件名字的一把分布式锁,不同的文件名就意味着不同的锁
	// ZkDistributeImproveLock的实现(参考"从零开始的高并发(二)--- Zookeeper实现分布式锁")
	Lock distributeWriteLock = new ZkDistributeImproveLock(fileLockPath+"/"+fileName);
	distributeWriteLock.lock();
	try {
	    //以下就是对properties进行遍历然后把属性值一个个写进去而已
	    //如果真的没看懂这个,建议使用IDEA进行debug一下,
		items.keySet().iterator();
		Set<Map.Entry<Object, Object>> entrySet = items.entrySet();
		for (Map.Entry<Object, Object> entry : entrySet) {
			System.out.println(entry.getKey() + "=" + entry.getValue());
			String cfgItemNode = cfgNode +"/"+ entry.getKey().toString();
			client.createPersistent(cfgItemNode, entry.getValue());
		} 
	} finally {
		distributeWriteLock.unlock();
	}
	return cfgNode;
}
复制代码

⑤ 删除方法

删除和创建需要征用同一把锁,不然我创建的时候你就把我的给删了这不太团结吧,deleteRecursive()方法是一个递归删除方法,如果没有获取到锁,会进行阻塞,也可以在分布式锁实现中指定一个恢复时间,这个时间内没有获取到锁,就把进程给结束掉,或者使用try,没有获取到,也就是有人在修改,那这时我们给个返回值也好,抛出个异常也好,告诉该进程有人在修改即可

@Override
public void deleteCnfFile(String fileName) {
	checkElement(fileName);
	String cfgNode = confFilePath+"/"+fileName;
	Lock distributeWriteLock = new ZkDistributeImproveLock(fileLockPath+"/"+fileName);
	
	//获取锁
	distributeWriteLock.lock();
	try {
		client.deleteRecursive(cfgNode);
	} finally {
	    //释放锁
		distributeWriteLock.unlock();
	}
}
复制代码

下面是deleteRecursive的源码

它就会先从自己的子目录,也就是children那里开始找,然后遍历删除

⑥ 修改方法

这里我是把提交过来的properties文件整个放入了znode里面,提交过来的会默认为最新的一份配置,主要思路就在于,先获取原来的,然后再和现在新传过来的进行比对。

因为如果我们使用刚刚那个demo中先删后增的方法,可能我100个配置我就只修改了一个配置,但是还是把整个文件给重新弄一份,这样会引发很多的监听,所以try代码块里面是先获取到原来的配置信息,Set existentItemSet主要作用是去重,如果这个集合中包含了所修改的配置信息,就再判断数据是否已经有变动,有变动的情况下修改,然后把多余的配置(这里的多余指的是没有进行过对比处理的数据,因为修改或者说是不变都已经是经过对比处理的)给删除即可。如果这个集合中不包含我现在写入的配置项,就要新增。

@Override
public void modifyCnfItem(String fileName, Properties items) {
	checkElement(fileName);
	// 获取子节点信息
	String cfgNode = confFilePath+"/"+fileName;
	// 简单粗暴的实现
	if(items == null) {throw new NullPointerException("要修改的配置项不能为空");}
	items.keySet().iterator();
	Set<Map.Entry<Object, Object>> entrySet = items.entrySet();
	Lock distributeWriteLock = new ZkDistributeImproveLock(fileLockPath+"/"+fileName);
	distributeWriteLock.lock();
    try {
    	// 获取zk中已存在的配置信息
    	List<String> itemNodes = client.getChildren(cfgNode);
    	Set<String> existentItemSet = itemNodes.stream().collect(Collectors.toSet());
    	
		for (Map.Entry<Object, Object> entry : entrySet) {
			System.out.println(entry.getKey() + "=" + entry.getValue());
			String itemName = entry.getKey().toString();
			String itemData = entry.getValue().toString();
			
			String cfgItemNode = cfgNode + "/" + itemName;
			if(existentItemSet.contains(itemName)) {// zk中存在的配置项
				String itemNodeData = client.readData(cfgItemNode);
				if(! eql(itemNodeData, itemData)) { // 数据不一致才需要修改
					client.writeData(cfgItemNode, itemData);
				}
				existentItemSet.remove(itemName);	// 剩下的就是需要删除的配置项
			} else { // zk中不存在的配置项,新的配置项
				client.createPersistent(cfgItemNode, itemData);
			}
		}
		
		// existentItemSet中剩下的就是需要删除的
		if(!existentItemSet.isEmpty()) {
			for(String itemName : existentItemSet) {
				String cfgItemNode = cfgNode + "/" + itemName;
				client.delete(cfgItemNode);
			}
		}
	} finally {
		distributeWriteLock.unlock();
	}
}
复制代码

⑦ 读取方法

比较简单,没啥好说

@Override
public Properties loadCnfFile(String fileName) {
	if(! fileName.startsWith("/")) {
		fileName = confFilePath+"/"+fileName;
	}
	return loadNodeCnfFile(fileName);
}

private Properties loadNodeCnfFile(String cfgNode) {
	checkElement(cfgNode);
	if(! client.exists(cfgNode)) {
		throw new ZkNoNodeException(cfgNode);
	}
	// 获取子节点信息
	List<String> itemNodes = client.getChildren(cfgNode);
	
	// 读取配置信息,并装载到Properties中
	if(itemNodes == null || itemNodes.isEmpty()) {
		return new Properties();
	}
	Properties file = new Properties();
	itemNodes.stream().forEach((e)->{
		String itemNameNode = cfgNode + "/" + e;
		String data = client.readData(itemNameNode, true);
		file.put(e, data);
	});
	return file;
}
复制代码

⑧ 设置对应的监听事件

这里子节点的数据读取(也就是我刚刚打开zookeeper的zkClient然后用命令来新增节点)会有个问题,当我们需要新增节点的时候,我们是不会触发我们DataChange监听事件的,那是因为,我新增节点的时候,根本就还没有这个节点,在还没有这个节点的时候,是无法监听内容的变更的。所以我们在还是需要通过子节点的handleChildChange()来补救这个监听,这就是为什么代码最后需要用到client.subscribeChildChanges(···),此时监听父节点的子节点变更,如果子节点有发生变化了,那就触发事件即可,fileNodePath是父节点的路径

triggerHandler请参考 ⑨ 的内容

@Override
public void watchCnfFile(String fileName, ChangeHandler changeHandler) {
	if(! fileName.startsWith("/")) {
		fileName = confFilePath+"/"+fileName;
	}
	final String fileNodePath = fileName;
	// 读取文件
	Properties p = loadNodeCnfFile(fileNodePath);
	if(p != null) {
		// 合并5秒配置项变化,5秒内变化只触发一次处理事件
		int waitTime = 5;
		final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(1);
		scheduled.setRemoveOnCancelPolicy(true);
		final List<ScheduledFuture<?>> futureList = new ArrayList<ScheduledFuture<?>>();
		Set<Map.Entry<Object, Object>> entrySet = p.entrySet();
		for (Map.Entry<Object, Object> entry : entrySet) {
			System.out.println("监控:"+fileNodePath+"/"+entry.getKey().toString());
			client.subscribeDataChanges(fileNodePath+"/"+entry.getKey().toString(), new IZkDataListener() {
				@Override
				public void handleDataDeleted(String dataPath) throws Exception {
					System.out.println("触发删除:"+dataPath);
					triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler);
				}
				
				@Override
				public void handleDataChange(String dataPath, Object data) throws Exception {
					System.out.println("触发修改:"+dataPath);
					triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler);
				}
			});
		}
		client.subscribeChildChanges(fileNodePath, new IZkChildListener() {
			@Override
			public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
				System.out.println("触发子节点:"+parentPath);
				triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler);
			}
		});
	}
	
}
复制代码

⑨ 合并

在 ⑧ 那里我们已经看到了一个配置项的修改会触发这么多个监听事件,这种做法不太可取,回到我们的ConfigureReader中,我们已经定义好的这么一个接口,把配置发生变化的配置项在5秒(waitTime)内所进行的修改都合并成一个事件

/**
 * 配置文件变化处理器
 * ChangeHandler
 */
interface ChangeHandler {
	/**
	 * 配置文件发生变化后给一个完整的属性对象
	 * @param newProp
	 */
	void itemChange(Properties newProp);
}
复制代码

再回到ZkConfigureCenter.java,比较方便的理解就是,我们把我们对提交上来的修改,根据时间划分为5秒一块,此时在这5秒之内最后一个修改任务之前的future,如果仍未执行成功,会进行cancel()取消掉,然后remove掉,我们只取这5秒内的最后一个事件作为我们监听事件触发的条件,所以与其说合并事件,不如就是单纯认为,我们取了5秒内futureList的最后一个future。

/**
 * 合并修改变化事件,5秒钟内发生变化的合并到一个事件进行
 * @param futureList 装有定时触发任务的列表
 * @param scheduled 定时任务执行器
 * @param waitTime 延迟时间,单位秒
 * @param fileName zk配置文件的节点
 * @param changeHandler 事件处理器
 */
private void triggerHandler(List<ScheduledFuture<?>> futureList, ScheduledThreadPoolExecutor scheduled, int waitTime, String fileName, ChangeHandler changeHandler) {
	if(futureList != null && !futureList.isEmpty()) {
		for(int i = 0 ; i < futureList.size(); i++) {
			ScheduledFuture<?> future = futureList.get(i);
			if(future != null && !future.isCancelled() && !future.isDone()) {
				future.cancel(true);
				futureList.remove(future);
				i--;
			}
		}
	}
	ScheduledFuture<?> future = scheduled.schedule(()->{
		Properties p = loadCnfFile(fileName);
		changeHandler.itemChange(p);
	}, waitTime, TimeUnit.SECONDS);
	futureList.add(future);
}
复制代码

至此配置中心的模拟实现就结束了。整个类的代码都在 ① ~ ⑨ 中,可以直接ctrl+c/+v使用

finally

配置中心的知识总结其实就是下面4个知识点

持久节点+watch机制+分布式锁+事件合并
复制代码

master和关于一些zookeeper官网的一些treasure的介绍搁置到下一篇···

next:从零开始的高并发(六)--- Zookeeper的经典应用场景3

关注下面的标签,发现更多相似文章
评论