Elasticsearch 选主流程

3,559 阅读5分钟

Master 节点周期性地向集群中的其他节点发送ping, 检查节点是否仍然活跃,如果发现集群中节点数量不足,则放弃 master 身份,重新执行选主流程。其他节点也周期性地检查 master 是否活跃,如果得不到 master 的恢复则该节点重新执行选主流程

集群中节点发现和选主依赖 Discovery Interface,ES 内置的实现是 Zen Discovery

Elasticsearch的选主算法 -- Bully

Elasticsearch 集群中节点数量有限,单个节点能够处理和其他所有节点间的连接,集群中不会出现节点频繁加入和离开的情况,因此 Zen Discovery 中使用了实现比较简单的 Bully 算法。

使用 Bully 算法的集群中有一个 master 维护整个集群的状态信息,并定时向集群中其他节点推送集群状态的版本信息,如果直接使用 Bully 算法,会遇到下面几个问题,因此 Elasticsearch 做了特殊化的处理

问题1:master节点假死

master 可能因负载过重而处于不稳定的状态,可能无法响应某些节点的请求,但短时间内可以恢复正常,为了避免频繁的选举,ES 中使用了推迟选举的方法,直到 master 失效才进行选举。当节点收不到 master 的响应时会先请求其他节点,获取活跃 master 的列表,确定 master 挂掉后再发起选举。

问题2:脑裂

考虑下面这种情况,原集群由 A、B、C、D、E 组成,主节点是 A。当两个交换机之间连接中断之后,A、B 不能再与C、D、E 进行通信,形成了两个网络分区。A、B 组成的集群中 master 仍然是 A,而C、D、E 会选出一个新的 master,客户端访问 A、B 或者高 C、D、E 时数据不一致。

img

img

Elasticsearch 采用了设置 “法定得票人数过半” 解决,在选举过程中当节点得票达到 discovery.zen.minimum_master_nodes 的值时才能成为 master,这个值通常设定为(具有master资格节点数 / 2)+1

将上例中 discovery.zen.minimum_master_nodes 设置为3。当两个交换机之间连接中断之后,A不能再与C,D,E进行通信,C、D、E所组成的网络分区中不存在活跃的master,因此发起选举。A的集群中只剩下A和B,不足minimum_master_nodes,因此放弃 master 身份。C、D、E进行投票,直到达成一致选出一个 master,形成一个新的集群,其中节点数量为 3,刚好满足 minimum_master_nodes,而A、B不再处理来自客户端的请求。

选主过程

通过 ZenDiscovery::findMaster 确定临时 master:

private DiscoveryNode findMaster() {
    // 1. PING 所有节点,获取各节点保存的集群信息
    List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
    
    // 2. 由于上面是获取的其他节点的信息,这里需要将本节点加上
    final DiscoveryNode localNode = transportService.getLocalNode();    
    fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));    
    // 3. 若设置了 master_election_ignore_non_masters 则去掉没有 master 资格(node.master: false)的节点
    final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
    
    // 4. 将各节点认为的 master 加入 activeMasters 列表
    List<DiscoveryNode> activeMasters = new ArrayList<>();    
    for (ZenPing.PingResponse pingResponse : pingResponses) {
        // 避免未经其他节点检查就将本节点选为 master
        if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {           
            activeMasters.add(pingResponse.master());        
        }    
    }    
    // 5. 将 PING 到的具有 master 资格的节点加入 masterCandidates 列表作为候选节点
    List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();    
    for (ZenPing.PingResponse pingResponse : pingResponses) {        
        if (pingResponse.node().isMasterNode()) {            
            masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));        
        }    
    }
    
    if (activeMasters.isEmpty()) {
        // 6. 没有活跃的 master
        if (electMaster.hasEnoughCandidates(masterCandidates)) {
            // 7. 拥有足够的候选节点,则进行选举
            final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);                   
            return winner.getNode();        
        } else {
            // 8. 无法选举,无法得到 master,返回 null
            return null;        
        }    
    } else {
        assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";        
        // 9. 有活跃的 master,从 activeMasters 中选择
        return electMaster.tieBreakActiveMasters(activeMasters);    
    }
}
  1. 在没有活跃的 master 时使用,上面第 7 步执行 master 的选举,通过 MasterCandidate::compare 对候选节点进行比较

    // ElectMasterService.MasterCandidate::compare
    public static int compare(MasterCandidate c1, MasterCandidate c2) {
        // 优先选择集群状态版本最新的节点
        int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
        if (ret == 0) {
            ret = compareNodes(c1.getNode(), c2.getNode());
        }
        return ret;
    }
    
    // ElectMasterService::compareNodes
    private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
        // 集群状态版本相同时优先选择具有 master 资格的节点
        if (o1.isMasterNode() && !o2.isMasterNode()) {
            return -1;
        }
        if (!o1.isMasterNode() && o2.isMasterNode()) {
            return 1;
        }
        // 以上条件都相同时选择 ID 较小的节点
        return o1.getId().compareTo(o2.getId());
    }
    
  2. 在有活跃的 master 时,上面第 9 步通过 ElectMasterService::tieBreakAcitveMasters 使用 ElectMasterService::compareNodes 的规则从 activeMasters 中选择

根据选出的临时 master 是否本节点等待或投票

if (transportService.getLocalNode().equals(masterNode)) {
    // 选出的临时 master 是本节点,则等待被选举为真正的 master
    final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1);
    nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
             new NodeJoinController.ElectionCallback() {
                 @Override
                 public void onElectedAsMaster(ClusterState state) {
                     // 成功被选举为 master
                     synchronized (stateMutex) {
                         joinThreadControl.markThreadAsDone(currentThread);
                     }
                 }

                 @Override
                 public void onFailure(Throwable t) {
                     // 等待超时,重新开始选举流程
                     synchronized (stateMutex) {
                         joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                     }
                 }
             }
    );
} else {
    // 选出的临时 master 不是本节点,不再接收其他节点的 join 请求
    nodeJoinController.stopElectionContext(masterNode + " elected");

    // 向临时节点发送 join 请求(投票),被选举的临时 master 在确认成为 master 并发布新的集群状态后才会返回
    final boolean success = joinElectedMaster(masterNode);

    // 成功加入之前选择的临时 master 节点,则结束线程,否则重新选举
    synchronized (stateMutex) {
        if (success) {
             DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
             if (currentMasterNode == null) {
                 joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
             } else if (currentMasterNode.equals(masterNode) == false) {
joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
             }
             joinThreadControl.markThreadAsDone(currentThread);
        } else {
            joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
        }
    }
}

选举完成

NodeJoinController::checkPendingJoinsAndElectIfNeeded 在节点获得足够的得票时使节点成为 master,并发布新的集群状态

private synchronized void checkPendingJoinsAndElectIfNeeded() {
    // 计算节点得票数
    final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
    if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
        ...
    } else {
        // 得票数足够,成为 master
        electionContext.closeAndBecomeMaster();
    }
}

public synchronized int getPendingMasterJoinsCount() {
    int pendingMasterJoins = 0;
    // 统计节点得票数,只计算拥有 master 资格节点的投票
    for (DiscoveryNode node : joinRequestAccumulator.keySet()) {
        if (node.isMasterNode()) {
            pendingMasterJoins++;
        }
    }
    return pendingMasterJoins;
}