前言
Debug 一下 ZooKeeper 群首选举的细节。
关联阅读
如果希望可以跟随这个文章调试 ZooKeeper 的源码,推荐阅读我写的架设教程,大概在半小时内可以启动Debug环境。
预览:
- 简述 ZooKeeper 关于群首选举的概念;
- 从 main 方法开始,逐步剖析群首选举的基础通信模块,QuorumCnxMananger 及其相关细节。
- 基于 QuorumCnxMananger 提供的点对点通信基础,讨论了 FastLeaderElection 的代码实现。
定义
ZooKeeper 功能上是一个分布式系统的协调中间件,本质上其实就是一个存储中间件,通过自定义的客户端和服务端的通信,达到一些订阅方面的额外功能。
既然是一个存储中间件,并且是在分布式环境下处理问题,就需要 CAP 理论上先做选型 ZooKeeper 属于的是 CP 理论。这个设计结论,可以通过反证法来获得。当 ZooKeeper 产生网络异常或主节点异常时,集群并不会接受任何的读写请求,直至集群恢复,这就意味着 ZooKeeper 在 CAP 理论上放弃了可用性 A。
实际上,ZooKeeper 集群在处理客户端读写时,写请求最终是一定落在主节点上的,这也意味着无论 ZooKeeper 的集群有多大,主节点写瓶颈都会存在,这是为什么 ZooKeeper 并不适合做较大规模的存储。这样设计的好处是一致性实现比较简单。
ZooKeeper 的主节点被命名为 Leader ,它的高可用性在于 Leader 宕机时,会自动从集群里快速进行群首选举
,恢复集群的可用性,在高可用方面还是很可靠的。下面是关于群首选举
的一些技术原理。
原理
节点角色
在 ZooKeeper 集群中,节点集合内会存在三种角色
- Leader 负责
写入请求
,发起提案
; - Follower 响应查询,转发
写入请求
,参与选举
和写入投票
- ObServer 响应查询,转发
写入请求
- 说说对 ZooKeeper 的期待
群首选举只涉及 Leader 和 Follower 节点,此文暂不讲述 ObServer 相关。
事务 transaction 及其 ZXID
ZooKeeper 集群会把读请求发散到其他节点上,以提高集群的处理读的能力。而写请求都会落点 Leader 上,写请求会涉及到提案等操作,我们把这个完成的一次由 Leader 发起的集群写操作,成为事务
。
每一个事务
都会有自己的标识符,一般缩写为 ZXID
,这个ZXID
是 64bit 的 Long 整数,前 32bit 存储的 epoch 值,任期数(一些翻译是 时间戳 是非常不合当的),后 32bit 存储的是 counter 值,表示计数值。
这里小结一下:Leader 节点发布所有的写请求中都会带有 ZXID
,这样的设计会有两个好处。
- 保证节点的数据更新都必定是顺序的,更新都一定是顺序的,这样就避免了很多并发写引起的问题(此处参考 MySQL 最高的隔离级别序列化)。
- 间接地实现了数据同步的幂等性,这样会让同步的逻辑可以更加简易,同时更容易实现重试,保证集群的数据一致性。
ZxidUtils.java 源码
/**
*
* ZXID : 64bit
* |--- 32bit 任期 ---|-- 32bit 事务计数器 --|
*/
/**
* 从 ZxId 获取 任期数
* @param zxid
* @return
*/
public static long getEpochFromZxid(long zxid) {
return zxid >> 32L;
}
/**
* 从 ZXID 获取 计数值
* @param zxid
* @return
*/
public static long getCounterFromZxid(long zxid) {
return zxid & 0xffffffffL;
}
/**
* 输入 epoch 和 counter , 组合获得一个 long 类型的 ZXID
* @param epoch
* @param counter
* @return
*/
public static long makeZxid(long epoch, long counter) {
return (epoch << 32L) | (counter & 0xffffffffL);
}
简单来说,每次 Leader 发起一次写操作,操作完成后,每个节点最终都会同步事务
时,会更新自己的 ZXID ,这个ZXID 会参与群首选举。
群首选举依据
- Epoch(任期数): 这个数值和上述的数值是一样的,都是由 Leader 发布的。Leader 的 Epoch 值是哪里获得的?就是自己上位的那一瞬间,自行确认的。
- ZXID: 如上文,不复述;
- SID: 在 zoo.cfg 文件 + myid 文件,人工确定的,不详述;
// 选票类
public class Vote {
private final int version;
private final long id; // 推选的 zk节点 的sid
private final long zxid; // 推选的 zk节点 的zxid
private final long electionEpoch; // 选举周期,一次选举会需要重复投票,新投票+1
private final long peerEpoch; // 被推举 Leader 的任期数
private final ServerState state; // 当前服务器的状态,默认 LOOKING
}
源码分析
QuorumPeer.java
在集群的选举层面,每个 ZK 节点都有一个 QuorumPeer 对象,这是对 ZK 节点在集群选举层面的抽象。简单理解:就是 ZK 设置了一个专门用于参与选举的议员一样。这样的设计是符合单一职责原则
的。
// QuorumPeer.java
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
// FastLeaderElection 算法下负责各个服务器底层Leader选举过程中的网络通信
private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();
// 存储 ZK 数据的数据抽象
private ZKDatabase zkDb;
}
QuorumPeer 是继承 Thread 对象,这个比较好理解,因为 ZK 集群在选举角度上,总是从 LOOKING -> LEADING/FOLLOW -> LOOKING 不断地进行状态迁移的,于是需要一个 Thread 持续观察和进行响应变更。
QuorumPeer 的实例化和初始化
QuorumPeer 对象在 QuorumPeerMain.main() 方法启动后会被实例化。
// QuorumPeerMain.java v3.7.0 line:175行
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if (quorumPeer.isQuorumSaslAuthEnabled()) {
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
if (config.jvmPauseMonitorToRun) {
quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
}
quorumPeer.start(); // 启动线程 从这里切入
ZKAuditProvider.addZKStartStopAuditLog();
quorumPeer.join();
上述代码是 QuorumPeer 对象的构建过程,整体来说是把 Config 文件的内容设置到对象中,并且 start
线程。
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 加载 zkDataBase,把数据从持久化的磁盘加载进来,有机会聊数据模型的时候会详细说说
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
// 开始进行群首选举
startLeaderElection();
startJvmPauseMonitor();
super.start();
}
// QuorumPeer.java 群首选举的入口,除了启动时会调用;发生 Leader 断线也会调用。
public synchronized void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
// 假如处于 LOOKING 状态,先构建自己的选票。选择自己为Leader
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
// 生成选举算法,目前只支持 type = 3 的类型了。从这里切入
this.electionAlg = createElectionAlgorithm(electionType);
}
// QuorumPeer.java
protected Election createElectionAlgorithm(int electionAlgorithm) {
... 省略其他代码
// 构建集群通信连接器
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
// fixme 暂不清楚这个
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
listener.start();
// FastLeaderElection 是目前 ZK 的默认选举实现,后面会重新提及。
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
... 省略其他代码
return le;
}
小结上面的代码段:
- 从
QuorumPeerMain
的main
方法开始,我们可以 debug 到,其会构建出QuorumPeer
对象; QuorumPeer
对象是集群里的选举单位,继承于Thread
类;- 在
QuorumPeer.start()
前,会执行按照如下的流程图,产生 Vote,QuorumCnxMananger,FastLeaderElection。 - 至此,
main
线程的工作就已经完成了。
QuorumCnxManager.java 节点间的通信实现
假设我们的配置文件里配置了3个 ZK Server 节点,那么在集群没达到法定节点数时,所有的节点都会尽力去互相联系,企图达到法定节点数。从下列的报错上看,不难发现 QuorumCnxManager
在做这个事情
server.1=127.0.0.1:2222:2223 (假设这是本节点)
server.2=127.0.0.1:3333:3334
server.3=127.0.0.1:4444:4445
// 节点会尽最大的努力去连接集群内的其他节点。
// 这里不难看到,QuorumCnxManager 负责这个工作
2020-05-10 12:35:12,201 [myid:1] - WARN [QuorumConnectionThread-[myid=1]-2:QuorumCnxManager@400] - Cannot open channel to 2 at election address /127.0.0.1:3334
java.net.ConnectException: Connection refused (Connection refused)
...
at org.apache.zookeeper.server.quorum.QuorumCnxManager.initiateConnection(QuorumCnxManager.java:383)
at org.apache.zookeeper.server.quorum.QuorumCnxManager$QuorumConnectionReqThread.run(QuorumCnxManager.java:457)
...
2020-05-10 12:35:12,201 [myid:1] - WARN [QuorumConnectionThread-[myid=1]-3:QuorumCnxManager@400] - Cannot open channel to 3 at election address /127.0.0.1:4445
java.net.ConnectException: Connection refused (Connection refused)
...
at org.apache.zookeeper.server.quorum.QuorumCnxManager.initiateConnection(QuorumCnxManager.java:383)
at org.apache.zookeeper.server.quorum.QuorumCnxManager$QuorumConnectionReqThread.run(QuorumCnxManager.java:457)
...
QuorumCnxManager 主要用在 FastLeaderElection 算法下,负责算法底层各个服务器之间的 TCP 连接和消息收发。它会连接集群中的其他服务器,并保证相同的两台服务器之间只有一个 TCP 连接。
节点间的数据如何传输和发送
在讨论 QuorumCnxManager
如何在选举过程中管理与其他节点的网络传输问题上,先留意几个关键对象。
QuorumCnxManager {
// ZKServer.sid -> 消息发送器 的映射
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
// ZKServer.sid -> 消息发送队列 的映射。这样设置是保证消息发送不会交叉混乱。
final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
// ZKServer.sid -> 给sid的最近一次发送数据缓存。和其他服务器连接成功后,会补发一次记录在这个 map 中的消息,防止上次连接断开时最后一条消息发送失败。接受端会做好去重操作。
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
// 数据接收队列,用于保存其他节点发送过来的数据
public final BlockingQueue<Message> recvQueue;
}
在QuorumCnxManager
中还有两个内部类:SendWorker
和 RecvWorker
,他们都是Thread
,主要工作都在 run
方法中实现(比较简单,不贴代码了),结合上述的结构可以简单地定义他们的工作关系:
- 在一个节点建立
Socket
连接后,先构建一个SendWorker
、一个RecvWorker
,分别用于发送数据和接受数据,注册一个sid - sendQueue
到queueSendMap
中。 - 发送数据先是投递到
queueSendMap
的队列中,由SendWorker.run()
去消费。这是一个典型的生产消费者模型
,那发送数据自然是异步的。 RecvWorker.run()
会持续监听 TCP 连接的数据留,获取到数据后会直接投递到recvQueue
,供其他线程使用。
至此,我们已经知道节点在连接内是如何通信的了了,我们还要需要知道节点如何建立连接。
节点间如何建立连接
相比于我们编写的 C-S 服务器,集群有一个特点,是它们并没有 C-S 关系,都是平等的节点集合
。考虑一个问题:如何对一堆零散的节点集合完成两两TCP-Socket
相连?这里有两个问题需要解决的:
- 每个节点既能主动发起 TCP 连接请求,又能接受其他 TCP 的请求。
- 上述两者完成的请求必须是等价的(这里要解决一个A-B节点同时互相发起请求的问题,且最终只能达成一个 TCP 连接)。
在解决问题1上,QuorumCnxManager 定义了两个 Thread
。
QuorumConnectionReqThread
。这个线程接受 目的地址,然后发起请求的。QuorumConnectionReceiverThread
。这个线程是用于外部接受连接请求的。
两者的工作完成后,最后会向上一小节的几个Map注册一个sid。至此,QuorumCnxManager 节点间的通信实现就完成了。
FastLeaderElection.java 群首选举的实现
有了上文的通信基础,选举的算法就可以基于上述的通信进行工作了。
这里我们先从接口开始分析,然后了解一下 FastLeaderElection
的几个关键字段、内部类和方法。
public interface Election {
Vote lookForLeader() throws InterruptedException;
void shutdown();
}
public class FastLeaderElection implements Election {
// 数据发送队列
LinkedBlockingQueue<ToSend> sendqueue;
// 数据接收队列
LinkedBlockingQueue<Notification> recvqueue;
这两个只是消息的通道,FastLeaderElection 还定义了两个对象服务于这两个队列的:
// 会从队列中接收消息,并处理这些消息,这个类会比较复杂一点
class WorkerReceiver extends ZooKeeperThread {...}
// 这个只是负责将 QuorumCnxManager 的消息入队
class WorkerSender extends ZooKeeperThread {...}
// 向所有节点广播当前节点的 "选票"
sendNotifications();
}
选票的通信管理可以抽象成这样:
至此 FastLeaderElection
外的一切通信基础我们都了解完毕了。这时候可以开始讨论最具体的选举算法了 Vote lookForLeader()
。
/**
* Starts a new round of leader election. Whenever our QuorumPeer
* changes its state to LOOKING, this method is invoked, and it
* sends notifications to all other peers.
*/
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
self.start_fle = Time.currentElapsedTime();
try {
/*
* The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset
* if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority
* of participants has voted for it.
*
* 票据Map,用于保存选票的
*/
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
/*
* The votes from previous leader elections, as well as the votes from the current leader election are
* stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.
* Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use
* outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than
* the electionEpoch of the received notifications) in a leader election.
*/
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = minNotificationInterval;
synchronized (this) {
// 选票届数自增1
logicalclock.incrementAndGet();
// 初始化选票信息,自我认为是Leader
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info(
"New election. My id = {}, proposed zxid=0x{}",
self.getId(),
Long.toHexString(proposedZxid));
// 向所有节点推选自己为 Leader 节点
sendNotifications();
SyncedLearnerTracker voteSet;
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
// 循环等待:不选出主节点,就在此处死循环,直至节点被 stop 掉
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*
* 从 recvqueue 中获取选票
*/
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if (n == null) {
if (manager.haveDelivered()) {
sendNotifications();
} else {
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout * 2;
notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
LOG.info("Notification time out: {}", notTimeout);
} else if (validVoter(n.sid) && validVoter(n.leader)) {
// 判断 sid 是不是有效的,投票者是不是有效以及投票者投票的对象。假如当前节点不认为这是有效的,放弃这个n选票
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
// LOOKING 收到选票,的处理过程
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
// 假如当前节点的选票次数已经落后,直接清理自己手上的选票。
// 群首选举的时候,往往会进行多次才能选出来,而由于网络通信关系,并不保证节点的 electionEpoch 是一致的
// 修正自己的 electionEpoch, 并清空 recvset
logicalclock.set(n.electionEpoch);
recvset.clear();
// 具体的选举PK逻辑 比任期、比zxid、比sid
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
// 假如票据内的更"适合"Leader,则切换自己的选择
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// 因为已经失效了,所以节点需要重新发布自己本次投票的数据。
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
Long.toHexString(n.electionEpoch),
Long.toHexString(logicalclock.get()));
// 收到一个无效的投票,忽略处理
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
// 收到当前届数的有效选票,票据内的更"适合"Leader,则切换自己的选择
updateProposal(n.leader, n.zxid, n.peerEpoch);
// 广播给所有其他节点
sendNotifications();
}
LOG.debug(
"Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
n.sid,
n.leader,
Long.toHexString(n.zxid),
Long.toHexString(n.electionEpoch));
// don't care about the version if it's in LOOKING state
// 归档选票数据
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 统计
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
if (voteSet.hasAllQuorums()) {
// 判断是否已经超过半数
// Verify if there is any change in the proposed leader
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
// 在继续收集一下数据,保证自己统计的票据是正确的。
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
// 没问题。修改自己的节点状态
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote; // 获得主节点
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
case FOLLOWING:
case LEADING:
// LEADING Leader 收到选票会怎么样?
/*
* Consider all notifications from the same epoch
* together. 这种情况下,可能会出现在节点短暂假死,然后重新恢复的状态。例如:超长 STOP-THE-WORLD
*/
if (n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
*
* Note that the outofelection map also stores votes from the current leader election.
* See ZOOKEEPER-1732 for more information.
*/
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}
上述的代码比较复杂,结合代码注释查看会比较好,流程图中省略了部分细节,但应该不影响阅读,如下:
疑问:
- 参与选举的 Eposh 和 ZXID 是从哪里来的?
答:在 QuorumPeer 的初始化是,会初始化 ZKDatabase,在这个存储模型中,我们可以获得 ZXID。
- 那在运行过程中,Epoch 任期 和 ZXID 是不是大部分情况节点都是一致的?这样会不会导致Leader的选举基本按 sid 这样选下来。如果真的这样,假如 sid = 1 的在一个稳定性差的主机上,会不会导致经常 leader 都会选到它,整个集群为之表现不稳定啊?
- 节点和节点间的通信是相互的,也就是说有n个节点时,TCP 的连接数需要达到 n x n,加载节点通信和节点选举通信各占一个,也就是集群的通信连接数是 n x n x 2,算上客户端连接,那达到的TCP连接数是非常大的,ZooKeeper 集群会如何处理类似问题?还是说这是 ZooKeeper 集群的一个固有属性。
后记
ZooKeeper 的代码写得还是有点强,但可读性一般般。不过不能否认 ZooKeeper 是一个很强的瑞士军刀,虽然短小,但功能强大,Dubbo 利用它做注册中心,Kafka也依赖 ZooKeeper。这次 Debug 的过程还给我对 对等节点集群
的通信有了一个新的认识,群首选举的算法也很奇妙。但自己对 ZooKeeper 这个工具的使用具体得心应手估计还很长远。
另外文章中关于 ZooKeeper 还有不少误解和未考虑的细节地方,希望大家可以一起深入讨论,如发现错误,烦请指正一下。
上文写的注释,在此处都可以看到 zookeeper 源码阅读和调试