ZooKeeper(二) 源码剖析: 群首选举

1,242 阅读15分钟

前言

Debug 一下 ZooKeeper 群首选举的细节。

关联阅读

如果希望可以跟随这个文章调试 ZooKeeper 的源码,推荐阅读我写的架设教程,大概在半小时内可以启动Debug环境。

  1. ZooKeeper 源码环境架设

预览:

  1. 简述 ZooKeeper 关于群首选举的概念;
  2. 从 main 方法开始,逐步剖析群首选举的基础通信模块,QuorumCnxMananger 及其相关细节。
  3. 基于 QuorumCnxMananger 提供的点对点通信基础,讨论了 FastLeaderElection 的代码实现。

定义

ZooKeeper 功能上是一个分布式系统的协调中间件,本质上其实就是一个存储中间件,通过自定义的客户端和服务端的通信,达到一些订阅方面的额外功能。

既然是一个存储中间件,并且是在分布式环境下处理问题,就需要 CAP 理论上先做选型 ZooKeeper 属于的是 CP 理论。这个设计结论,可以通过反证法来获得。当 ZooKeeper 产生网络异常或主节点异常时,集群并不会接受任何的读写请求,直至集群恢复,这就意味着 ZooKeeper 在 CAP 理论上放弃了可用性 A。

实际上,ZooKeeper 集群在处理客户端读写时,写请求最终是一定落在主节点上的,这也意味着无论 ZooKeeper 的集群有多大,主节点写瓶颈都会存在,这是为什么 ZooKeeper 并不适合做较大规模的存储。这样设计的好处是一致性实现比较简单。

ZooKeeper 的主节点被命名为 Leader ,它的高可用性在于 Leader 宕机时,会自动从集群里快速进行群首选举,恢复集群的可用性,在高可用方面还是很可靠的。下面是关于群首选举的一些技术原理。

原理

节点角色

在 ZooKeeper 集群中,节点集合内会存在三种角色

  1. Leader 负责写入请求,发起提案;
  2. Follower 响应查询,转发写入请求,参与选举写入投票
  3. ObServer 响应查询,转发写入请求
  4. 说说对 ZooKeeper 的期待

群首选举只涉及 Leader 和 Follower 节点,此文暂不讲述 ObServer 相关。

事务 transaction 及其 ZXID

ZooKeeper 集群会把读请求发散到其他节点上,以提高集群的处理读的能力。而写请求都会落点 Leader 上,写请求会涉及到提案等操作,我们把这个完成的一次由 Leader 发起的集群写操作,成为事务

每一个事务都会有自己的标识符,一般缩写为 ZXID,这个ZXID是 64bit 的 Long 整数,前 32bit 存储的 epoch 值,任期数(一些翻译是 时间戳 是非常不合当的),后 32bit 存储的是 counter 值,表示计数值

这里小结一下:Leader 节点发布所有的写请求中都会带有 ZXID ,这样的设计会有两个好处。

  1. 保证节点的数据更新都必定是顺序的,更新都一定是顺序的,这样就避免了很多并发写引起的问题(此处参考 MySQL 最高的隔离级别序列化)。
  2. 间接地实现了数据同步的幂等性,这样会让同步的逻辑可以更加简易,同时更容易实现重试,保证集群的数据一致性。
ZxidUtils.java 源码
    /**
     *
     * ZXID : 64bit
     * |--- 32bit 任期 ---|-- 32bit 事务计数器 --|
     */

    /**
     * 从 ZxId 获取 任期数
     * @param zxid
     * @return
     */
    public static long getEpochFromZxid(long zxid) {
        return zxid >> 32L;
    }

    /**
     * 从 ZXID 获取 计数值
     * @param zxid
     * @return
     */
    public static long getCounterFromZxid(long zxid) {
        return zxid & 0xffffffffL;
    }

    /**
     * 输入 epoch 和 counter , 组合获得一个 long 类型的 ZXID
     * @param epoch
     * @param counter
     * @return
     */
    public static long makeZxid(long epoch, long counter) {
        return (epoch << 32L) | (counter & 0xffffffffL);
    }

简单来说,每次 Leader 发起一次写操作,操作完成后,每个节点最终都会同步事务时,会更新自己的 ZXID ,这个ZXID 会参与群首选举。

群首选举依据

  • Epoch(任期数): 这个数值和上述的数值是一样的,都是由 Leader 发布的。Leader 的 Epoch 值是哪里获得的?就是自己上位的那一瞬间,自行确认的。
  • ZXID: 如上文,不复述;
  • SID: 在 zoo.cfg 文件 + myid 文件,人工确定的,不详述;
// 选票类
public class Vote {
    private final int version;
    private final long id; // 推选的 zk节点 的sid
    private final long zxid; // 推选的 zk节点 的zxid
    private final long electionEpoch; // 选举周期,一次选举会需要重复投票,新投票+1
    private final long peerEpoch; // 被推举 Leader 的任期数
    private final ServerState state; // 当前服务器的状态,默认 LOOKING
}

源码分析

QuorumPeer.java

在集群的选举层面,每个 ZK 节点都有一个 QuorumPeer 对象,这是对 ZK 节点在集群选举层面的抽象。简单理解:就是 ZK 设置了一个专门用于参与选举的议员一样。这样的设计是符合单一职责原则的。

// QuorumPeer.java
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
    
    // FastLeaderElection 算法下负责各个服务器底层Leader选举过程中的网络通信
    private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();

    // 存储 ZK 数据的数据抽象
    private ZKDatabase zkDb;
}

QuorumPeer 是继承 Thread 对象,这个比较好理解,因为 ZK 集群在选举角度上,总是从 LOOKING -> LEADING/FOLLOW -> LOOKING 不断地进行状态迁移的,于是需要一个 Thread 持续观察和进行响应变更。

QuorumPeer 的实例化和初始化

QuorumPeer 对象在 QuorumPeerMain.main() 方法启动后会被实例化。

// QuorumPeerMain.java v3.7.0 line:175行
            quorumPeer = getQuorumPeer();
            quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
            quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
            quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
            //quorumPeer.setQuorumPeers(config.getAllMembers());
            quorumPeer.setElectionType(config.getElectionAlg());
            quorumPeer.setMyid(config.getServerId());
            quorumPeer.setTickTime(config.getTickTime());
            quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
            quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
            quorumPeer.setInitLimit(config.getInitLimit());
            quorumPeer.setSyncLimit(config.getSyncLimit());
            quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
            quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
            quorumPeer.setConfigFileName(config.getConfigFilename());
            quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
            quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
            quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
            if (config.getLastSeenQuorumVerifier() != null) {
                quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
            }
            quorumPeer.initConfigInZKDatabase();
            quorumPeer.setCnxnFactory(cnxnFactory);
            quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
            quorumPeer.setSslQuorum(config.isSslQuorum());
            quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
            quorumPeer.setLearnerType(config.getPeerType());
            quorumPeer.setSyncEnabled(config.getSyncEnabled());
            quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
            if (config.sslQuorumReloadCertFiles) {
                quorumPeer.getX509Util().enableCertFileReloading();
            }
            quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
            quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
            quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());

            // sets quorum sasl authentication configurations
            quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
            if (quorumPeer.isQuorumSaslAuthEnabled()) {
                quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
                quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
                quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
                quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
                quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
            }
            quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
            quorumPeer.initialize();

            if (config.jvmPauseMonitorToRun) {
                quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
            }

            quorumPeer.start();     // 启动线程 从这里切入
            ZKAuditProvider.addZKStartStopAuditLog();
            quorumPeer.join();

上述代码是 QuorumPeer 对象的构建过程,整体来说是把 Config 文件的内容设置到对象中,并且 start 线程。

    @Override
    public synchronized void start() {
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
        }
        // 加载 zkDataBase,把数据从持久化的磁盘加载进来,有机会聊数据模型的时候会详细说说
        loadDataBase();
        startServerCnxnFactory();
        try {
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        // 开始进行群首选举
        startLeaderElection();
        startJvmPauseMonitor();
        super.start();
    }
    
    // QuorumPeer.java 群首选举的入口,除了启动时会调用;发生 Leader 断线也会调用。
    public synchronized void startLeaderElection() {
        try {
            if (getPeerState() == ServerState.LOOKING) {
                // 假如处于 LOOKING 状态,先构建自己的选票。选择自己为Leader
                currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
            }
        } catch (IOException e) {
            RuntimeException re = new RuntimeException(e.getMessage());
            re.setStackTrace(e.getStackTrace());
            throw re;
        }

        // 生成选举算法,目前只支持 type = 3 的类型了。从这里切入
        this.electionAlg = createElectionAlgorithm(electionType);
    }
    
    // QuorumPeer.java
    protected Election createElectionAlgorithm(int electionAlgorithm) {
        ... 省略其他代码
        
        // 构建集群通信连接器
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        if (oldQcm != null) {
            LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
            oldQcm.halt();
        }
        // fixme 暂不清楚这个
        QuorumCnxManager.Listener listener = qcm.listener;
        if (listener != null) {
            listener.start();
            // FastLeaderElection 是目前 ZK 的默认选举实现,后面会重新提及。
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        
        ... 省略其他代码
        return le;
    }

小结上面的代码段:

  1. QuorumPeerMainmain 方法开始,我们可以 debug 到,其会构建出 QuorumPeer 对象;
  2. QuorumPeer 对象是集群里的选举单位,继承于 Thread 类;
  3. QuorumPeer.start() 前,会执行按照如下的流程图,产生 Vote,QuorumCnxMananger,FastLeaderElection。
  4. 至此,main 线程的工作就已经完成了。

QuorumCnxManager.java 节点间的通信实现

假设我们的配置文件里配置了3个 ZK Server 节点,那么在集群没达到法定节点数时,所有的节点都会尽力去互相联系,企图达到法定节点数。从下列的报错上看,不难发现 QuorumCnxManager 在做这个事情

server.1=127.0.0.1:2222:2223 (假设这是本节点)
server.2=127.0.0.1:3333:3334
server.3=127.0.0.1:4444:4445

// 节点会尽最大的努力去连接集群内的其他节点。
// 这里不难看到,QuorumCnxManager 负责这个工作

2020-05-10 12:35:12,201 [myid:1] - WARN  [QuorumConnectionThread-[myid=1]-2:QuorumCnxManager@400] - Cannot open channel to 2 at election address /127.0.0.1:3334
java.net.ConnectException: Connection refused (Connection refused)
	...
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.initiateConnection(QuorumCnxManager.java:383)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager$QuorumConnectionReqThread.run(QuorumCnxManager.java:457)
	...
2020-05-10 12:35:12,201 [myid:1] - WARN  [QuorumConnectionThread-[myid=1]-3:QuorumCnxManager@400] - Cannot open channel to 3 at election address /127.0.0.1:4445
java.net.ConnectException: Connection refused (Connection refused)
    ...
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.initiateConnection(QuorumCnxManager.java:383)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager$QuorumConnectionReqThread.run(QuorumCnxManager.java:457)
	...

QuorumCnxManager 主要用在 FastLeaderElection 算法下,负责算法底层各个服务器之间的 TCP 连接和消息收发。它会连接集群中的其他服务器,并保证相同的两台服务器之间只有一个 TCP 连接。

注释选自:ZooKeeper 学习笔记(四)-深入理解群首选举

节点间的数据如何传输和发送

在讨论 QuorumCnxManager 如何在选举过程中管理与其他节点的网络传输问题上,先留意几个关键对象。

QuorumCnxManager {
    // ZKServer.sid -> 消息发送器 的映射
    final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
    // ZKServer.sid -> 消息发送队列 的映射。这样设置是保证消息发送不会交叉混乱。
    final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
    // ZKServer.sid -> 给sid的最近一次发送数据缓存。和其他服务器连接成功后,会补发一次记录在这个 map 中的消息,防止上次连接断开时最后一条消息发送失败。接受端会做好去重操作。
    final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
    // 数据接收队列,用于保存其他节点发送过来的数据
    public final BlockingQueue<Message> recvQueue;
}

QuorumCnxManager中还有两个内部类:SendWorkerRecvWorker,他们都是Thread,主要工作都在 run 方法中实现(比较简单,不贴代码了),结合上述的结构可以简单地定义他们的工作关系:

  1. 在一个节点建立 Socket 连接后,先构建一个 SendWorker、一个RecvWorker,分别用于发送数据和接受数据,注册一个 sid - sendQueuequeueSendMap 中。
  2. 发送数据先是投递到 queueSendMap 的队列中,由 SendWorker.run() 去消费。这是一个典型的生产消费者模型,那发送数据自然是异步的。
  3. RecvWorker.run() 会持续监听 TCP 连接的数据留,获取到数据后会直接投递到 recvQueue,供其他线程使用。

至此,我们已经知道节点在连接内是如何通信的了了,我们还要需要知道节点如何建立连接

节点间如何建立连接

相比于我们编写的 C-S 服务器,集群有一个特点,是它们并没有 C-S 关系,都是平等的节点集合。考虑一个问题:如何对一堆零散的节点集合完成两两TCP-Socket相连?这里有两个问题需要解决的:

  1. 每个节点既能主动发起 TCP 连接请求,又能接受其他 TCP 的请求。
  2. 上述两者完成的请求必须是等价的(这里要解决一个A-B节点同时互相发起请求的问题,且最终只能达成一个 TCP 连接)。

在解决问题1上,QuorumCnxManager 定义了两个 Thread

  • QuorumConnectionReqThread。这个线程接受 目的地址,然后发起请求的。
  • QuorumConnectionReceiverThread。这个线程是用于外部接受连接请求的。

两者的工作完成后,最后会向上一小节的几个Map注册一个sid。至此,QuorumCnxManager 节点间的通信实现就完成了。

FastLeaderElection.java 群首选举的实现

有了上文的通信基础,选举的算法就可以基于上述的通信进行工作了。

这里我们先从接口开始分析,然后了解一下 FastLeaderElection 的几个关键字段、内部类和方法。

public interface Election {
    Vote lookForLeader() throws InterruptedException;
    void shutdown();
}

public class FastLeaderElection implements Election {
    // 数据发送队列
    LinkedBlockingQueue<ToSend> sendqueue;
    // 数据接收队列
    LinkedBlockingQueue<Notification> recvqueue;
    
    这两个只是消息的通道,FastLeaderElection 还定义了两个对象服务于这两个队列的:
    
    // 会从队列中接收消息,并处理这些消息,这个类会比较复杂一点
    class WorkerReceiver extends ZooKeeperThread {...}
    
    // 这个只是负责将 QuorumCnxManager 的消息入队
    class WorkerSender extends ZooKeeperThread {...}
    
    // 向所有节点广播当前节点的 "选票"
    sendNotifications(); 
}

选票的通信管理可以抽象成这样:

图片选择 ZooKeeper 学习笔记(四)-深入理解群首选举

至此 FastLeaderElection 外的一切通信基础我们都了解完毕了。这时候可以开始讨论最具体的选举算法了 Vote lookForLeader()

    /**
     * Starts a new round of leader election. Whenever our QuorumPeer
     * changes its state to LOOKING, this method is invoked, and it
     * sends notifications to all other peers.
     */
    public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }

        self.start_fle = Time.currentElapsedTime();
        try {
            /*
             * The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset
             * if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority
             * of participants has voted for it.
             * 
             * 票据Map,用于保存选票的
             */
            Map<Long, Vote> recvset = new HashMap<Long, Vote>();

            /*
             * The votes from previous leader elections, as well as the votes from the current leader election are
             * stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.
             * Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use
             * outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than
             * the electionEpoch of the received notifications) in a leader election.
             */
            Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = minNotificationInterval;

            synchronized (this) {
                // 选票届数自增1
                logicalclock.incrementAndGet();
                // 初始化选票信息,自我认为是Leader
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info(
                "New election. My id = {}, proposed zxid=0x{}",
                self.getId(),
                Long.toHexString(proposedZxid));
            // 向所有节点推选自己为 Leader 节点
            sendNotifications();

            SyncedLearnerTracker voteSet;

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
                // 循环等待:不选出主节点,就在此处死循环,直至节点被 stop 掉
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 * 
                 * 从 recvqueue 中获取选票
                 */
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                if (n == null) {
                    if (manager.haveDelivered()) {
                        sendNotifications();
                    } else {
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                    LOG.info("Notification time out: {}", notTimeout);
                } else if (validVoter(n.sid) && validVoter(n.leader)) {
                    // 判断 sid 是不是有效的,投票者是不是有效以及投票者投票的对象。假如当前节点不认为这是有效的,放弃这个n选票

                    /*
                     * Only proceed if the vote comes from a replica in the current or next
                     * voting view for a replica in the current or next voting view.
                     */
                    switch (n.state) {
                    case LOOKING:
                        // LOOKING 收到选票,的处理过程
                        if (getInitLastLoggedZxid() == -1) {
                            LOG.debug("Ignoring notification as our zxid is -1");
                            break;
                        }
                        if (n.zxid == -1) {
                            LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                            break;
                        }
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) {
                            // 假如当前节点的选票次数已经落后,直接清理自己手上的选票。
                            // 群首选举的时候,往往会进行多次才能选出来,而由于网络通信关系,并不保证节点的 electionEpoch 是一致的

                            // 修正自己的 electionEpoch, 并清空 recvset
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();

                            // 具体的选举PK逻辑 比任期、比zxid、比sid
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                // 假如票据内的更"适合"Leader,则切换自己的选择
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                            }
                            // 因为已经失效了,所以节点需要重新发布自己本次投票的数据。
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                                LOG.debug(
                                    "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                    Long.toHexString(n.electionEpoch),
                                    Long.toHexString(logicalclock.get()));
                                // 收到一个无效的投票,忽略处理
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                            // 收到当前届数的有效选票,票据内的更"适合"Leader,则切换自己的选择
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            // 广播给所有其他节点
                            sendNotifications();
                        }

                        LOG.debug(
                            "Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
                            n.sid,
                            n.leader,
                            Long.toHexString(n.zxid),
                            Long.toHexString(n.electionEpoch));

                        // don't care about the version if it's in LOOKING state
                        // 归档选票数据
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        // 统计
                        voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

                        if (voteSet.hasAllQuorums()) {
                            // 判断是否已经超过半数

                            // Verify if there is any change in the proposed leader
                            while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                // 在继续收集一下数据,保证自己统计的票据是正确的。
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                // 没问题。修改自己的节点状态
                                setPeerState(proposedLeader, voteSet);
                                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                leaveInstance(endVote);
                                return endVote; // 获得主节点
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: {}", n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        // LEADING Leader 收到选票会怎么样?
                        /*
                         * Consider all notifications from the same epoch
                         * together. 这种情况下,可能会出现在节点短暂假死,然后重新恢复的状态。例如:超长 STOP-THE-WORLD
                         */
                        if (n.electionEpoch == logicalclock.get()) {
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
                                setPeerState(n.leader, voteSet);
                                Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify that
                         * a majority are following the same leader.
                         *
                         * Note that the outofelection map also stores votes from the current leader election.
                         * See ZOOKEEPER-1732 for more information.
                         */
                        outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

                        if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            synchronized (this) {
                                logicalclock.set(n.electionEpoch);
                                setPeerState(n.leader, voteSet);
                            }
                            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
                        break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {
                if (self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
        }
    }

上述的代码比较复杂,结合代码注释查看会比较好,流程图中省略了部分细节,但应该不影响阅读,如下:

疑问:

  1. 参与选举的 Eposh 和 ZXID 是从哪里来的?

答:在 QuorumPeer 的初始化是,会初始化 ZKDatabase,在这个存储模型中,我们可以获得 ZXID。

  1. 那在运行过程中,Epoch 任期 和 ZXID 是不是大部分情况节点都是一致的?这样会不会导致Leader的选举基本按 sid 这样选下来。如果真的这样,假如 sid = 1 的在一个稳定性差的主机上,会不会导致经常 leader 都会选到它,整个集群为之表现不稳定啊?
  2. 节点和节点间的通信是相互的,也就是说有n个节点时,TCP 的连接数需要达到 n x n,加载节点通信和节点选举通信各占一个,也就是集群的通信连接数是 n x n x 2,算上客户端连接,那达到的TCP连接数是非常大的,ZooKeeper 集群会如何处理类似问题?还是说这是 ZooKeeper 集群的一个固有属性。

后记

ZooKeeper 的代码写得还是有点强,但可读性一般般。不过不能否认 ZooKeeper 是一个很强的瑞士军刀,虽然短小,但功能强大,Dubbo 利用它做注册中心,Kafka也依赖 ZooKeeper。这次 Debug 的过程还给我对 对等节点集群 的通信有了一个新的认识,群首选举的算法也很奇妙。但自己对 ZooKeeper 这个工具的使用具体得心应手估计还很长远。

另外文章中关于 ZooKeeper 还有不少误解和未考虑的细节地方,希望大家可以一起深入讨论,如发现错误,烦请指正一下。

上文写的注释,在此处都可以看到 zookeeper 源码阅读和调试

引用

  1. 《ZooKeeper 分布式过程协同技术详解》 Flavio Junqueira Benjamin Reed 著
  2. CAP理论小结
  3. ZooKeeper 学习笔记(四)-深入理解群首选举