前言
在前面几篇博客中,我总结了 ZooKeeper 的源码调试环境构建、集群群首选举、NIO 网络连接的博客,文章说实话写得比较乱,自己有些细节仍然是一知半解,或许写得不合理,对 ZooKeeper 有兴趣的同学可以在文章下留言,提提问题,大家一起讨论一下。这是我认为最适合学习源码的方法之一。
虽然不是很恰当,上图我自己对 ZooKeeper 的一些模块划分。Client 部分的代码有机会再分析,这篇文档把数据模型剖析清楚,应该算是在大体上把 ZooKeeper 的源码连接起来了,再去编写一个Watch模块(未来会分析),那自己去写一个简单的 ZooKeeper 应该也没问题。
前文连接:
PS: 本文对 ZooKeeper 的存储讨论都是基于单节点模型下的,集群模式下的 ZooKeeper 存储涉及到 Zab 一致性协议,后面有机会再讲。
ZK的存储:ZKDatabase、DataTree、FileTxnSnapLog
ZooKeeper 本质上是一个 K-V 存储中间件,这样基本上就必然会考虑数据的增删查改
还有持久化
的问题。 ZooKeeper 在API上是按照树型
样式来设计的,在数据结构上也差不多,ZKDatabase
则是这个数据结构的抽象。
ZKDatabase
从注释上看,ZKDatabase
的作用有如下几个:
- 维护内存数据库:指的是字段 DataTree
- 维护通信 session(ServerCnxn) 和有 watch(监控相关),这部分其实是委托给 DataTree 去完成的
这个对象是随着 ZooKeeper 的 main 方法启动而启动的,启动后会通过 FileTxnSnapLog 读取日志,构建 DataTree。
/**
* This class maintains the in memory database of zookeeper
* server states that includes the sessions, datatree and the
* committed logs. It is booted up after reading the logs
* and snapshots from the disk.
*/
public class ZKDatabase {
// 存储数据结构
protected DataTree dataTree;
// 快照日志
protected FileTxnSnapLog snapLog;
}
DataTree
DataTree
是负责维护树形数据结构的类。我认为核心的字段和方法有以下几个:
public class DataTree {
字段:
1. NodeHashMap nodes; 全路径 -> DataNode 的 HashMap 映射
2. dataWatches, childWatches 监视器
3. PathTrie pTrie; 前缀树,提供树的数据结构,响应客户端的操作。遍历、查找之类的
方法:
1. serialize(OutputArchive oa, String tag) // 序列化
2. deserialize(InputArchive ia, String tag) // 反序列化
3. processTxn(TxnHeader header, Record txn) // 响应事务,对节点进行 增删改
}
FileTxnSnapLog
FileTxnSnapLog
是用于保存 快照(Snap) 文件和 事务(Txn )文件的。所以这个类最终的是统筹 事务文件、快照文件
的操作。
public class FileTxnSnapLog {
//the directory containing the
//the transaction logs
final File dataDir; // 事务文件 句柄
//the directory containing the
//the snapshot directory
final File snapDir; // 快照文件 句柄
TxnLog txnLog; // 事务Log
SnapShot snapLog; // 快照Log
method:
// 反序列化,把数据还原到 DataTree 中,ZooKeeper 启动的时候调用
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener);
// 把 DataTree 序列化到快照文件中。
public void save(
DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
boolean syncSnap) throws IOException
}
完成了三个我认为 ZooKeeper 数据模型比较核心的类的介绍后,下面分析两个关于 ZooKeeper 数据存储的细节。
- ZooKeeper 持久化策略及源码
- 响应 Client 的事务请求
DataTree 的持久化策略
ZooKeeper 作为一个存储中间件,持久化是一定需要解决的。在这里,ZooKeeper 使用的策略分两部分:
- 快照文件保存 Snapshot
- 事务文件保存 Txnlog
保存的原理和 Redis 基本是一致的,两种文件的优缺点也一样。在恢复数据的时候,ZooKeeper 是先执行快照文件,然后再尝试把落后的事务数据通过 Txnlog 进行回放
,最终达到恢复最新的事务数据。
下面简单剖析一下初始化ZK数据的源码
和修改ZK数据的源码
// ZKDatabase.java
public long loadDataBase() throws IOException {
long startTime = Time.currentElapsedTime();
/**
* snapLog 的类是包括: 快照文件+事务文件的,restore 方法是通过两个文件还原数据,并返回最新的 zxid
*/
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); // 从此处切入
initialized = true;
long loadTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime);
LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}",
loadTime, Long.toHexString(zxid), dataTree.getTreeDigest());
return zxid;
}
// FileTxnSnapLog.java
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
long snapLoadingStartTime = Time.currentElapsedTime();
// 通过快照文件 snapLog,反序列化 dataTree,并返回序列化的最大的 zxid 。
// 具体的做法,就是遍历快照文件,生成树。(Step 1)
long deserializeResult = snapLog.deserialize(dt, sessions);
ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime);
FileTxnLog txnLog = new FileTxnLog(dataDir);
--- 省略代码
File initFile = new File(dataDir.getParent(), "initialize");
RestoreFinalizer finalizer = () -> {
// 通过快照生成的 dataTree 有可能是落后与版本的,此时需要通过 事务日志 在快照的基础上,回放部分事务,并获得最新的数据
// 具体的做法:根据当前dt的最大事务数据为起点,回放后面的事务 (Step 2)
long highestZxid = fastForwardFromEdits(dt, sessions, listener);
// The snapshotZxidDigest will reset after replaying the txn of the
// zxid in the snapshotZxidDigest, if it's not reset to null after
// restoring, it means either there are not enough txns to cover that
// zxid or that txn is missing
DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
if (snapshotZxidDigest != null) {
LOG.warn(
"Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
+ "which might lead to inconsistent state",
Long.toHexString(highestZxid),
Long.toHexString(snapshotZxidDigest.getZxid()));
}
return highestZxid; // 返回正在的,最大的 zxId
};
--- 省略代码
if (-1L == deserializeResult) {
// 找不到数据库,就尝试重新建造一个数据库
/* this means that we couldn't find any snapshot, so we need to
* initialize an empty database (reported in ZOOKEEPER-2325) */
if (txnLog.getLastLoggedZxid() != -1) {
// ZOOKEEPER-3056: provides an escape hatch for users upgrading
// from old versions of zookeeper (3.4.x, pre 3.5.3).
if (!trustEmptySnapshot) {
throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
} else {
LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);
return finalizer.run();
}
}
--- 省略代码
return finalizer.run(); // 执行回放,并返回最大的 zxid
}
Client 的事务请求
以上为初始化 ZooKeeper 数据库的核心逻辑。由于篇幅原因,Step1 和 Step2 的实现不进一步剖析了。下面分析一下这个 ZKDatabase 如何处理客户端的事务 Request。
这里先说一下 ZooKeeper 的处理逻辑:
- 在网络层接受到 Request 后,通过 SyncRequestProcessor 把 Request 写入事务日志中
- SyncRequestProcessor 根据 数据统计,判断是否需要把 DataTree 写一个快照
- FinalRequestProccessor 最后获得这个 Request 后,会对这个 Request 执行响应处理,修改 DataTree 数据
现在看看代码是怎么处理的:
SyncRequestProcessor.java 是一个线程,因此逻辑都在 run 方法中。 // 方法已经被精简了,源码会更加复杂一点
@Override
public void run() {
// 这个方法主要是 1、写事务日志;2、尝试写快照
try {
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
resetSnapshotStats();
lastFlushTime = Time.currentElapsedTime();
while (true) {
// 从队列中取出 Request
Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
if (si == null) {
/* We timed out looking for more writes to batch, go ahead and flush immediately */
flush(); // 假如取不到数据,就可以尝试进行flush,把数据都写到 DataTree 中去 从此处切入!
si = queuedRequests.take();
}
// track the number of records written to the log
if (!si.isThrottled() && zks.getZKDatabase().append(si)) { // append 一个 Request,只是写事务日志
if (shouldSnapshot()) {
// 临时起一个线程,对 DataTree 进行快照写盘
new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
snapThreadMutex.release();
}
}
}.start();
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
}
LOG.info("SyncRequestProcessor exited!");
}
}
private void flush() throws IOException, RequestProcessorException {
if (this.toFlush.isEmpty()) {
return;
}
ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
long flushStartTime = Time.currentElapsedTime();
zks.getZKDatabase().commit();
ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
if (this.nextProcessor == null) {
this.toFlush.clear();
} else {
while (!this.toFlush.isEmpty()) {
final Request i = this.toFlush.remove();
long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
// 这里切入,Processor 的调用链传递 Requets 参数,响应 Requegt 请求,把数据同步到 DataTree 中。
// 此处代码比较深入,整体逻辑是吧 Request i 的操作写到 DataTree 中
this.nextProcessor.processRequest(i);
}
if (this.nextProcessor instanceof Flushable) {
((Flushable) this.nextProcessor).flush();
}
}
lastFlushTime = Time.currentElapsedTime();
}
RequestProcessor 的特点
网络层接收到包后,进行一次请求封装,生成 Request 对象,交给 RequestProccessor 调用链进行处理。具体的做法是 Request 对象投入 RequestProccessor 的请求处理队列中,然后 RequestProccessor 自动消费并传递。
在这里,调用链的构建比较有意思,是通过 Wrapper 层层包装的。(Dubbo SPI 的 Wrapper 也是采用类似的做法)
// ZooKeeperServer.java ,构建 RequestProcessors 责任链
protected void setupRequestProcessors() {
// 通过代理模式层层封装 Request 处理器
RequestProcessor finalProcessor = new FinalRequestProcessor(this); // 最后执行的放在里面
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor) syncProcessor).start(); // ZooKeeper 单线程 处理事务
firstProcessor = new PrepRequestProcessor(this, syncProcessor); // 最前执行的放在外面
((PrepRequestProcessor) firstProcessor).start(); // 单线程处理事务,处理完后,提交到下一个 Processor 的 Request 队列里,下一个继续消费
}
另外,SyncRequestProcessor 类和 PrepRequestProcessor 类都是 Thread,并且全局只有一个,这样是了保证处理模型是单线程的,这样有两个好处:
- 让 ZKDatabase 保持简单,不用考虑并发问题;
- 减少线程切换的成本;
同时这里也反应出一个问题,为了保证存储的持久性,每个事务 Request 都需要写到事务文件才返回,这也意味着,ZooKeeper 是单线程同步写文件的,所以它的写性能就相当差,不适合做常规的存储服务。当然,它的定位是:分布式协调服务,这样的设计也是没问题的。
后记
本文整理一下,ZooKeeper 存储模型。从持久化策略来看,可以类比 Redis 的做法。在修改上,以 set 命令为例子,简单地推导了一下流程。由于代码确实比较深入,没完全理解,因此有一些很细节的代码没深入探讨。
写这个博客有个比较明显的经验:
- 源码阅读一定要先提出问题:例如 ZKDatabase 是如何被初始化的,又如何被客户端写入的
- 顺着问题去构想高层的设计,寻找关键的对象: DataTree, FileTxnSnapLog
- 围绕这些关键对象,针对问题,打断点,通过堆栈回溯
- 假如每次回溯都回到 main 方法,或者线程池,这样效果也不好,应该
分层阅读源码
。例如,回溯到网络层,由于网络层的源码我已经了解了,在知识体系对 ZooKeeper 的网络层设计有了概念。那么回溯到自己理解的层次就可以了。