Nacos一致性协议实现之Distro协议浅析

2,853 阅读8分钟

原文链接:www.liaochuntao.cn/2019/09/16/…

前期导读

Nacos 中的 DistroConsistencyServiceImpl 工作浅析

之前的文章说的很浅显,这次打算重头好好解析下Nacos中使用的alibaba自研的AP协议——Distro

核心代码实现

Nacos Naming 模块启动做的时数据同步

DistroConsistencyServiceImpl

public void load() throws Exception {
  if (SystemUtils.STANDALONE_MODE) {
    initialized = true;
    return;
  }
  // size = 1 means only myself in the list, we need at least one another server alive:
  // 集群模式下,需要等待至少两个节点才可以将逻辑进行
  while (serverListManager.getHealthyServers().size() <= 1) {
    Thread.sleep(1000L);
    Loggers.DISTRO.info("waiting server list init...");
  }

  // 获取所有健康的集群节点
  for (Server server : serverListManager.getHealthyServers()) {
    // 自己则不需要进行数据同步广播操作
    if (NetUtils.localServer().equals(server.getKey())) {
      continue;
    }
    if (Loggers.DISTRO.isDebugEnabled()) {
      Loggers.DISTRO.debug("sync from " + server);
    }
    // 从别的服务器进行全量数据拉取操作,只需要执行一次即可,剩下的交由增量同步任务去完成
    if (syncAllDataFromRemote(server)) {
      initialized = true;
      return;
    }
  }
}
全量数据拉取的动作

数据拉取执行者的动作

public boolean syncAllDataFromRemote(Server server) {
  try {
    // 获取数据
    byte[] data = NamingProxy.getAllData(server.getKey());
    // 接收到的数据进行处理
    processData(data);
    return true;
  } catch (Exception e) {
    Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
    return false;
  }
}

数据提供者的响应

@RequestMapping(value = "/datums", method = RequestMethod.GET)
public ResponseEntity getAllDatums(HttpServletRequest request, HttpServletResponse response) throws Exception {
  // 直接将存储的数据容器——Map进行序列化传输
  String content = new String(serializer.serialize(dataStore.getDataMap()), StandardCharsets.UTF_8);
  return ResponseEntity.ok(content);
}

接下来,当从某一个Server Node拉取了全量数据后的操作

public void processData(byte[] data) throws Exception {
        if (data.length > 0) {
            // 先将数据进行反序列化
            Map<String, Datum<Instances>> datumMap =
                serializer.deserializeMap(data, Instances.class);

            // 对数据进行遍历处理
            for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
                // 数据放入数据存储容器——DataStore中
                dataStore.put(entry.getKey(), entry.getValue());
                // 判断监听器是否包含了对这个Key的监听,如果没有,表明是一个新的数据
                if (!listeners.containsKey(entry.getKey())) {
                    // pretty sure the service not exist:
                    if (switchDomain.isDefaultInstanceEphemeral()) {
                        // create empty service
                        Loggers.DISTRO.info("creating service {}", entry.getKey());
                        Service service = new Service();
                        String serviceName = KeyBuilder.getServiceName(entry.getKey());
                        String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                        service.setName(serviceName);
                        service.setNamespaceId(namespaceId);
                        service.setGroupName(Constants.DEFAULT_GROUP);
                        // now validate the service. if failed, exception will be thrown
                        service.setLastModifiedMillis(System.currentTimeMillis());
                        service.recalculateChecksum();
                        // 回调 Listener 监听器,告知新的Service数据
                        listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)
                            .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
                    }
                }
            }
            // 进行 Listener 的监听回调
            for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
                if (!listeners.containsKey(entry.getKey())) {
                    // Should not happen:
                    Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
                    continue;
                }

                try {
                    for (RecordListener listener : listeners.get(entry.getKey())) {
                        listener.onChange(entry.getKey(), entry.getValue().value);
                    }
                } catch (Exception e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
                    continue;
                }

                // Update data store if listener executed successfully:
                dataStore.put(entry.getKey(), entry.getValue());
            }
        }
    }

到这里,Nacos Naming模块的Distro协议的初次启动时的数据全量同步到这里就告一段落了,接下来就是数据的增量同步了,首先要介绍一个Distro协议的一个概念——权威Server

权威Server的判断器

public class DistroMapper implements ServerChangeListener {

    private List<String> healthyList = new ArrayList<>();

    public List<String> getHealthyList() {
        return healthyList;
    }

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private ServerListManager serverListManager;

    /**
     * init server list
     */
    @PostConstruct
    public void init() {
        serverListManager.listen(this);
    }

    // 判断该数据是否可以由本节点进行响应
    public boolean responsible(Cluster cluster, Instance instance) {
        return switchDomain.isHealthCheckEnabled(cluster.getServiceName())
            && !cluster.getHealthCheckTask().isCancelled()
            && responsible(cluster.getServiceName())
            && cluster.contains(instance);
    }

    // 根据 ServiceName 进行 Hash 计算,找到对应的权威节点的索引,判断是否是本节点,是的话表明该数据可以由本节点进行处理
    public boolean responsible(String serviceName) {
        if (!switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) {
            return true;
        }

        if (CollectionUtils.isEmpty(healthyList)) {
            // means distro config is not ready yet
            return false;
        }

        int index = healthyList.indexOf(NetUtils.localServer());
        int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
        if (lastIndex < 0 || index < 0) {
            return true;
        }

        int target = distroHash(serviceName) % healthyList.size();
        return target >= index && target <= lastIndex;
    }

    // 根据 ServiceName 找到权威 Server 的地址
    public String mapSrv(String serviceName) {
        if (CollectionUtils.isEmpty(healthyList) || !switchDomain.isDistroEnabled()) {
            return NetUtils.localServer();
        }

        try {
            return healthyList.get(distroHash(serviceName) % healthyList.size());
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + NetUtils.localServer(), e);

            return NetUtils.localServer();
        }
    }

    public int distroHash(String serviceName) {
        return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
    }

    @Override
    public void onChangeServerList(List<Server> latestMembers) {

    }

    @Override
    public void onChangeHealthyServerList(List<Server> latestReachableMembers) {

        List<String> newHealthyList = new ArrayList<>();
        for (Server server : latestReachableMembers) {
            newHealthyList.add(server.getKey());
        }
        healthyList = newHealthyList;
    }
}

上面的组件,就是Distro协议的一个重要部分,根据数据进行 Hash 计算查找集群节点列表中的权威节点

节点间的数据增量同步
public class TaskDispatcher {

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private DataSyncer dataSyncer;

    private List<TaskScheduler> taskSchedulerList = new ArrayList<>();

    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();

    @PostConstruct
    public void init() {
        // 构建任务执行器
        for (int i = 0; i < cpuCoreCount; i++) {
            TaskScheduler taskScheduler = new TaskScheduler(i);
            taskSchedulerList.add(taskScheduler);
            // 任务调度执行器提交
            GlobalExecutor.submitTaskDispatch(taskScheduler);
        }
    }

    public void addTask(String key) {
        // 根据 Key 进行 Hash 找到一个 TaskScheduler 进行任务提交
        taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
    }

    public class TaskScheduler implements Runnable {

        private int index;

        private int dataSize = 0;

        private long lastDispatchTime = 0L;

        private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);

        public TaskScheduler(int index) {
            this.index = index;
        }

        public void addTask(String key) {
            queue.offer(key);
        }

        public int getIndex() {
            return index;
        }

        @Override
        public void run() {
            List<String> keys = new ArrayList<>();
            while (true) {
                try {
                    // 从任务缓存队列中获取一个任务(存在超时设置)
                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                        TimeUnit.MILLISECONDS);
                    if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                        Loggers.DISTRO.debug("got key: {}", key);
                    }
                    // 如果不存在集群或者集群节点为空
                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
                        continue;
                    }
                    if (StringUtils.isBlank(key)) {
                        continue;
                    }
                    if (dataSize == 0) {
                        keys = new ArrayList<>();
                    }
                    // 进行批量任务处理,这里做一次暂存操作,为了避免
                    keys.add(key);
                    dataSize++;
                    // 如果此时的任务暂存数量达到了指定的批量,或者任务的时间达到了最大设定,进行数据同步任务
                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
                        // 为每一个server创建一个SyncTask任务
                        for (Server member : dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getKey())) {
                                continue;
                            }
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getKey());
                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                            }
                            // 进行任务提交,同时设置任务延迟执行时间,这里设置为立即执行
                            dataSyncer.submit(syncTask, 0);
                        }
                        lastDispatchTime = System.currentTimeMillis();
                        dataSize = 0;
                    }
                } catch (Exception e) {
                    Loggers.DISTRO.error("dispatch sync task failed.", e);
                }
            }
        }
    }
}
DataSyncer 数据同步任务的真正执行者
public class DataSyncer {

    ...

    @PostConstruct
    public void init() {
        // 执行定期的数据同步任务(每五秒执行一次)
        startTimedSync();
    }

    // 任务提交
    public void submit(SyncTask task, long delay) {
        // If it's a new task:
        if (task.getRetryCount() == 0) {
            // 遍历所有的任务 Key
            Iterator<String> iterator = task.getKeys().iterator();
            while (iterator.hasNext()) {
                String key = iterator.next();
                // 数据任务放入 Map 中,避免数据同步任务重复提交
                if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
                    // associated key already exist:
                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("sync already in process, key: {}", key);
                    }
                    // 如果任务已经存在,则移除该任务的 Key
                    iterator.remove();
                }
            }
        }
        // 如果所有的任务都已经移除了,结束本次任务提交
        if (task.getKeys().isEmpty()) {
            // all keys are removed:
            return;
        }
        // 异步任务执行数据同步
        GlobalExecutor.submitDataSync(() -> {
            // 1. check the server
            if (getServers() == null || getServers().isEmpty()) {
                Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
                return;
            }
            // 获取数据同步任务的实际同步数据
            List<String> keys = task.getKeys();
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
            }
            // 2. get the datums by keys and check the datum is empty or not
            // 通过key进行批量数据获取
            Map<String, Datum> datumMap = dataStore.batchGet(keys);
            // 如果数据已经被移除了,取消本次任务
            if (datumMap == null || datumMap.isEmpty()) {
                // clear all flags of this task:
                for (String key : keys) {
                    taskMap.remove(buildKey(key, task.getTargetServer()));
                }
                return;
            }
            // 数据序列化
            byte[] data = serializer.serialize(datumMap);
            long timestamp = System.currentTimeMillis();
            // 进行增量数据同步提交给其他节点
            boolean success = NamingProxy.syncData(data, task.getTargetServer());
            // 如果本次数据同步任务失败,则重新创建SyncTask,设置重试的次数信息
            if (!success) {
                SyncTask syncTask = new SyncTask();
                syncTask.setKeys(task.getKeys());
                syncTask.setRetryCount(task.getRetryCount() + 1);
                syncTask.setLastExecuteTime(timestamp);
                syncTask.setTargetServer(task.getTargetServer());
                retrySync(syncTask);
            } else {
                // clear all flags of this task:
                for (String key : task.getKeys()) {
                    taskMap.remove(buildKey(key, task.getTargetServer()));
                }
            }
        }, delay);
    }

    // 任务重试
    public void retrySync(SyncTask syncTask) {
        Server server = new Server();
        server.setIp(syncTask.getTargetServer().split(":")[0]);
        server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
        if (!getServers().contains(server)) {
            // if server is no longer in healthy server list, ignore this task:
            return;
        }
        // TODO may choose other retry policy.
        // 自动延迟重试任务的下次执行时间
        submit(syncTask, partitionConfig.getSyncRetryDelay());
    }

    public void startTimedSync() {
        GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
    }

    // 执行周期任务
    // 每次将自己负责的数据进行广播到其他的 Server 节点
    public class TimedSync implements Runnable {

        @Override
        public void run() {
            try {
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("server list is: {}", getServers());
                }
                // send local timestamps to other servers:
                Map<String, String> keyChecksums = new HashMap<>(64);
                // 对数据存储容器的
                for (String key : dataStore.keys()) {
                    // 如果自己不是负责此数据的权威 Server,则无权对此数据做集群间的广播通知操作
                    if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
                        continue;
                    }
                    // 获取数据操作,
                    Datum datum = dataStore.get(key);
                    if (datum == null) {
                        continue;
                    }
                    // 放入数据广播列表
                    keyChecksums.put(key, datum.value.getChecksum());
                }
                if (keyChecksums.isEmpty()) {
                    return;
                }
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
                }
                // 对集群的所有节点(除了自己),做数据广播操作
                for (Server member : getServers()) {
                    if (NetUtils.localServer().equals(member.getKey())) {
                        continue;
                    }
                    // 集群间的数据广播操作
                    NamingProxy.syncCheckSums(keyChecksums, member.getKey());
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("timed sync task failed.", e);
            }
        }
    }

    public List<Server> getServers() {
        return serverListManager.getHealthyServers();
    }

    public String buildKey(String key, String targetServer) {
        return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
    }
}

那么其他节点在接受到数据后的操作是什么

@RequestMapping(value = "/checksum", method = RequestMethod.PUT)
public ResponseEntity syncChecksum(HttpServletRequest request, HttpServletResponse response) throws Exception {
    // 由那个节点传输而来的数据
    String source = WebUtils.required(request, "source");
    String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
    // 数据序列化
    Map<String, String> dataMap = serializer.deserialize(entity.getBytes(), new TypeReference<Map<String, String>>() {});
    // 数据接收操作
    consistencyService.onReceiveChecksums(dataMap, source);
    return ResponseEntity.ok("ok");
}
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
    if (syncChecksumTasks.containsKey(server)) {
        // Already in process of this server:
        Loggers.DISTRO.warn("sync checksum task already in process with {}", server);
        return;
    }
    // 标记当前 Server 传来的数据正在处理
    syncChecksumTasks.put(server, "1");
    try {
        // 需要更新的 key
        List<String> toUpdateKeys = new ArrayList<>();
        // 需要删除的 Key
        List<String> toRemoveKeys = new ArrayList<>();
        // 对传来的数据进行遍历操作
        for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
            // 如果传来的数据存在由本节点负责的数据,则直接退出本次数据同步操作(违反了权威server的设定要求)
            if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
                // this key should not be sent from remote server:
                Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
                // abort the procedure:
                return;
            }
            // 如果当前数据存储容器不存在这个数据,或者校验值不一样,则进行数据更新操作
            if (!dataStore.contains(entry.getKey()) || 
            dataStore.get(entry.getKey()).value == null || 
            !dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
                toUpdateKeys.add(entry.getKey());
            }
        }
        // 直接遍历数据存储容器的所有数据
        for (String key : dataStore.keys()) {
            // 如果数据不是 source server 负责的,则跳过
            if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
                continue;
            }
            // 如果同步的数据不包含这个key,表明这个key是需要被删除的
            if (!checksumMap.containsKey(key)) {
                toRemoveKeys.add(key);
            }
        }
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);
        }
        // 执行数据闪出去操作
        for (String key : toRemoveKeys) {
            onRemove(key);
        }
        if (toUpdateKeys.isEmpty()) {
            return;
        }
        try {
            // 根据需要更新的key进行数据拉取,然后对同步的数据进行操作,剩下的如同最开始的全量数据同步所做的操作
            byte[] result = NamingProxy.getData(toUpdateKeys, server);
            processData(result);
        } catch (Exception e) {
            Loggers.DISTRO.error("get data from " + server + " failed!", e);
        }
    finally {
        // Remove this 'in process' flag:
        // 移除本次 source server 的数据同步任务标识
        syncChecksumTasks.remove(server);
    }
}