原文链接:www.liaochuntao.cn/2019/09/16/…
前期导读
Nacos 中的 DistroConsistencyServiceImpl 工作浅析
之前的文章说的很浅显,这次打算重头好好解析下Nacos
中使用的alibaba
自研的AP
协议——Distro
核心代码实现
Nacos Naming 模块启动做的时数据同步
DistroConsistencyServiceImpl
public void load() throws Exception {
if (SystemUtils.STANDALONE_MODE) {
initialized = true;
return;
}
// size = 1 means only myself in the list, we need at least one another server alive:
// 集群模式下,需要等待至少两个节点才可以将逻辑进行
while (serverListManager.getHealthyServers().size() <= 1) {
Thread.sleep(1000L);
Loggers.DISTRO.info("waiting server list init...");
}
// 获取所有健康的集群节点
for (Server server : serverListManager.getHealthyServers()) {
// 自己则不需要进行数据同步广播操作
if (NetUtils.localServer().equals(server.getKey())) {
continue;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync from " + server);
}
// 从别的服务器进行全量数据拉取操作,只需要执行一次即可,剩下的交由增量同步任务去完成
if (syncAllDataFromRemote(server)) {
initialized = true;
return;
}
}
}
全量数据拉取的动作
数据拉取执行者的动作
public boolean syncAllDataFromRemote(Server server) {
try {
// 获取数据
byte[] data = NamingProxy.getAllData(server.getKey());
// 接收到的数据进行处理
processData(data);
return true;
} catch (Exception e) {
Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
return false;
}
}
数据提供者的响应
@RequestMapping(value = "/datums", method = RequestMethod.GET)
public ResponseEntity getAllDatums(HttpServletRequest request, HttpServletResponse response) throws Exception {
// 直接将存储的数据容器——Map进行序列化传输
String content = new String(serializer.serialize(dataStore.getDataMap()), StandardCharsets.UTF_8);
return ResponseEntity.ok(content);
}
接下来,当从某一个Server Node
拉取了全量数据后的操作
public void processData(byte[] data) throws Exception {
if (data.length > 0) {
// 先将数据进行反序列化
Map<String, Datum<Instances>> datumMap =
serializer.deserializeMap(data, Instances.class);
// 对数据进行遍历处理
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
// 数据放入数据存储容器——DataStore中
dataStore.put(entry.getKey(), entry.getValue());
// 判断监听器是否包含了对这个Key的监听,如果没有,表明是一个新的数据
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
// 回调 Listener 监听器,告知新的Service数据
listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)
.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
// 进行 Listener 的监听回调
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}
try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}
// Update data store if listener executed successfully:
dataStore.put(entry.getKey(), entry.getValue());
}
}
}
到这里,Nacos Naming
模块的Distro
协议的初次启动时的数据全量同步到这里就告一段落了,接下来就是数据的增量同步了,首先要介绍一个Distro
协议的一个概念——权威Server
权威
Server
的判断器
public class DistroMapper implements ServerChangeListener {
private List<String> healthyList = new ArrayList<>();
public List<String> getHealthyList() {
return healthyList;
}
@Autowired
private SwitchDomain switchDomain;
@Autowired
private ServerListManager serverListManager;
/**
* init server list
*/
@PostConstruct
public void init() {
serverListManager.listen(this);
}
// 判断该数据是否可以由本节点进行响应
public boolean responsible(Cluster cluster, Instance instance) {
return switchDomain.isHealthCheckEnabled(cluster.getServiceName())
&& !cluster.getHealthCheckTask().isCancelled()
&& responsible(cluster.getServiceName())
&& cluster.contains(instance);
}
// 根据 ServiceName 进行 Hash 计算,找到对应的权威节点的索引,判断是否是本节点,是的话表明该数据可以由本节点进行处理
public boolean responsible(String serviceName) {
if (!switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) {
return true;
}
if (CollectionUtils.isEmpty(healthyList)) {
// means distro config is not ready yet
return false;
}
int index = healthyList.indexOf(NetUtils.localServer());
int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
if (lastIndex < 0 || index < 0) {
return true;
}
int target = distroHash(serviceName) % healthyList.size();
return target >= index && target <= lastIndex;
}
// 根据 ServiceName 找到权威 Server 的地址
public String mapSrv(String serviceName) {
if (CollectionUtils.isEmpty(healthyList) || !switchDomain.isDistroEnabled()) {
return NetUtils.localServer();
}
try {
return healthyList.get(distroHash(serviceName) % healthyList.size());
} catch (Exception e) {
Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + NetUtils.localServer(), e);
return NetUtils.localServer();
}
}
public int distroHash(String serviceName) {
return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
}
@Override
public void onChangeServerList(List<Server> latestMembers) {
}
@Override
public void onChangeHealthyServerList(List<Server> latestReachableMembers) {
List<String> newHealthyList = new ArrayList<>();
for (Server server : latestReachableMembers) {
newHealthyList.add(server.getKey());
}
healthyList = newHealthyList;
}
}
上面的组件,就是Distro
协议的一个重要部分,根据数据进行 Hash 计算查找集群节点列表中的权威节点
节点间的数据增量同步
public class TaskDispatcher {
@Autowired
private GlobalConfig partitionConfig;
@Autowired
private DataSyncer dataSyncer;
private List<TaskScheduler> taskSchedulerList = new ArrayList<>();
private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();
@PostConstruct
public void init() {
// 构建任务执行器
for (int i = 0; i < cpuCoreCount; i++) {
TaskScheduler taskScheduler = new TaskScheduler(i);
taskSchedulerList.add(taskScheduler);
// 任务调度执行器提交
GlobalExecutor.submitTaskDispatch(taskScheduler);
}
}
public void addTask(String key) {
// 根据 Key 进行 Hash 找到一个 TaskScheduler 进行任务提交
taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
}
public class TaskScheduler implements Runnable {
private int index;
private int dataSize = 0;
private long lastDispatchTime = 0L;
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
public TaskScheduler(int index) {
this.index = index;
}
public void addTask(String key) {
queue.offer(key);
}
public int getIndex() {
return index;
}
@Override
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
try {
// 从任务缓存队列中获取一个任务(存在超时设置)
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
TimeUnit.MILLISECONDS);
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("got key: {}", key);
}
// 如果不存在集群或者集群节点为空
if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
continue;
}
if (StringUtils.isBlank(key)) {
continue;
}
if (dataSize == 0) {
keys = new ArrayList<>();
}
// 进行批量任务处理,这里做一次暂存操作,为了避免
keys.add(key);
dataSize++;
// 如果此时的任务暂存数量达到了指定的批量,或者任务的时间达到了最大设定,进行数据同步任务
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
// 为每一个server创建一个SyncTask任务
for (Server member : dataSyncer.getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
}
// 进行任务提交,同时设置任务延迟执行时间,这里设置为立即执行
dataSyncer.submit(syncTask, 0);
}
lastDispatchTime = System.currentTimeMillis();
dataSize = 0;
}
} catch (Exception e) {
Loggers.DISTRO.error("dispatch sync task failed.", e);
}
}
}
}
}
DataSyncer 数据同步任务的真正执行者
public class DataSyncer {
...
@PostConstruct
public void init() {
// 执行定期的数据同步任务(每五秒执行一次)
startTimedSync();
}
// 任务提交
public void submit(SyncTask task, long delay) {
// If it's a new task:
if (task.getRetryCount() == 0) {
// 遍历所有的任务 Key
Iterator<String> iterator = task.getKeys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
// 数据任务放入 Map 中,避免数据同步任务重复提交
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
// associated key already exist:
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync already in process, key: {}", key);
}
// 如果任务已经存在,则移除该任务的 Key
iterator.remove();
}
}
}
// 如果所有的任务都已经移除了,结束本次任务提交
if (task.getKeys().isEmpty()) {
// all keys are removed:
return;
}
// 异步任务执行数据同步
GlobalExecutor.submitDataSync(() -> {
// 1. check the server
if (getServers() == null || getServers().isEmpty()) {
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
return;
}
// 获取数据同步任务的实际同步数据
List<String> keys = task.getKeys();
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
}
// 2. get the datums by keys and check the datum is empty or not
// 通过key进行批量数据获取
Map<String, Datum> datumMap = dataStore.batchGet(keys);
// 如果数据已经被移除了,取消本次任务
if (datumMap == null || datumMap.isEmpty()) {
// clear all flags of this task:
for (String key : keys) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
return;
}
// 数据序列化
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
// 进行增量数据同步提交给其他节点
boolean success = NamingProxy.syncData(data, task.getTargetServer());
// 如果本次数据同步任务失败,则重新创建SyncTask,设置重试的次数信息
if (!success) {
SyncTask syncTask = new SyncTask();
syncTask.setKeys(task.getKeys());
syncTask.setRetryCount(task.getRetryCount() + 1);
syncTask.setLastExecuteTime(timestamp);
syncTask.setTargetServer(task.getTargetServer());
retrySync(syncTask);
} else {
// clear all flags of this task:
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
}
}, delay);
}
// 任务重试
public void retrySync(SyncTask syncTask) {
Server server = new Server();
server.setIp(syncTask.getTargetServer().split(":")[0]);
server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
if (!getServers().contains(server)) {
// if server is no longer in healthy server list, ignore this task:
return;
}
// TODO may choose other retry policy.
// 自动延迟重试任务的下次执行时间
submit(syncTask, partitionConfig.getSyncRetryDelay());
}
public void startTimedSync() {
GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
}
// 执行周期任务
// 每次将自己负责的数据进行广播到其他的 Server 节点
public class TimedSync implements Runnable {
@Override
public void run() {
try {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", getServers());
}
// send local timestamps to other servers:
Map<String, String> keyChecksums = new HashMap<>(64);
// 对数据存储容器的
for (String key : dataStore.keys()) {
// 如果自己不是负责此数据的权威 Server,则无权对此数据做集群间的广播通知操作
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
continue;
}
// 获取数据操作,
Datum datum = dataStore.get(key);
if (datum == null) {
continue;
}
// 放入数据广播列表
keyChecksums.put(key, datum.value.getChecksum());
}
if (keyChecksums.isEmpty()) {
return;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
}
// 对集群的所有节点(除了自己),做数据广播操作
for (Server member : getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
// 集群间的数据广播操作
NamingProxy.syncCheckSums(keyChecksums, member.getKey());
}
} catch (Exception e) {
Loggers.DISTRO.error("timed sync task failed.", e);
}
}
}
public List<Server> getServers() {
return serverListManager.getHealthyServers();
}
public String buildKey(String key, String targetServer) {
return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
}
}
那么其他节点在接受到数据后的操作是什么
@RequestMapping(value = "/checksum", method = RequestMethod.PUT)
public ResponseEntity syncChecksum(HttpServletRequest request, HttpServletResponse response) throws Exception {
// 由那个节点传输而来的数据
String source = WebUtils.required(request, "source");
String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
// 数据序列化
Map<String, String> dataMap = serializer.deserialize(entity.getBytes(), new TypeReference<Map<String, String>>() {});
// 数据接收操作
consistencyService.onReceiveChecksums(dataMap, source);
return ResponseEntity.ok("ok");
}
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
if (syncChecksumTasks.containsKey(server)) {
// Already in process of this server:
Loggers.DISTRO.warn("sync checksum task already in process with {}", server);
return;
}
// 标记当前 Server 传来的数据正在处理
syncChecksumTasks.put(server, "1");
try {
// 需要更新的 key
List<String> toUpdateKeys = new ArrayList<>();
// 需要删除的 Key
List<String> toRemoveKeys = new ArrayList<>();
// 对传来的数据进行遍历操作
for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
// 如果传来的数据存在由本节点负责的数据,则直接退出本次数据同步操作(违反了权威server的设定要求)
if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
// this key should not be sent from remote server:
Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
// abort the procedure:
return;
}
// 如果当前数据存储容器不存在这个数据,或者校验值不一样,则进行数据更新操作
if (!dataStore.contains(entry.getKey()) ||
dataStore.get(entry.getKey()).value == null ||
!dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
toUpdateKeys.add(entry.getKey());
}
}
// 直接遍历数据存储容器的所有数据
for (String key : dataStore.keys()) {
// 如果数据不是 source server 负责的,则跳过
if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
continue;
}
// 如果同步的数据不包含这个key,表明这个key是需要被删除的
if (!checksumMap.containsKey(key)) {
toRemoveKeys.add(key);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);
}
// 执行数据闪出去操作
for (String key : toRemoveKeys) {
onRemove(key);
}
if (toUpdateKeys.isEmpty()) {
return;
}
try {
// 根据需要更新的key进行数据拉取,然后对同步的数据进行操作,剩下的如同最开始的全量数据同步所做的操作
byte[] result = NamingProxy.getData(toUpdateKeys, server);
processData(result);
} catch (Exception e) {
Loggers.DISTRO.error("get data from " + server + " failed!", e);
}
finally {
// Remove this 'in process' flag:
// 移除本次 source server 的数据同步任务标识
syncChecksumTasks.remove(server);
}
}