注册中心 Eureka 源码解析 —— 应用实例注册发现(七)之增量获取

2,057 阅读14分钟
原文链接: www.iocoder.cn
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-fetch-delta/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本



1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 获取增量注册信息的过程

前置阅读:《Eureka 源码解析 —— 应用实例注册发现(六)之全量获取》

FROM 《深度剖析服务发现组件Netflix Eureka》

Eureka-Client 获取注册信息,分成全量获取增量获取。默认配置下,Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,而后每 30增量获取刷新本地缓存( 非“正常”情况下会是全量获取 )。

本文重点在于增量获取

推荐 Spring Cloud 书籍

2. 应用集合一致性哈希码

Applications.appsHashCode ,应用集合一致性哈希码

增量获取注册的应用集合( Applications ) 时,Eureka-Client 会获取到:

  1. Eureka-Server 近期变化( 注册、下线 )的应用集合
  2. Eureka-Server 应用集合一致性哈希码

Eureka-Client 将变化的应用集合和本地缓存的应用集合进行合并后进行计算本地的应用集合一致性哈希码。若两个哈希码相等,意味着增量获取成功;若不相等,意味着增量获取失败,Eureka-Client 重新和 Eureka-Server 全量获取应用集合。

Eureka 比较应用集合一致性哈希码,和日常我们通过哈希码比较两个对象是否相等类似。

2.1 计算公式

appsHashCode = ${status}_${count}_

  • 使用每个应用实例状态( status ) + 数量( count )拼接出一致性哈希码。若数量为 0 ,该应用实例状态不进行拼接。状态以字符串大小排序

  • 举个例子,8 个 UP ,0 个 DOWN ,则 appsHashCode = UP_8_ 。8 个 UP ,2 个 DOWN ,则 appsHashCode = DOWN_2_UP_8_

  • 实现代码如下:

    // Applications.java
    public String getReconcileHashCode() {
       // 计数集合 key:应用实例状态
       TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap<String, AtomicInteger>();
       populateInstanceCountMap(instanceCountMap);
       // 计算 hashcode
       return getReconcileHashCode(instanceCountMap);
    }
    

    • 调用 #populateInstanceCountMap() 方法,计算每个应用实例状态的数量。实现代码如下:

      // Applications.java
      public void populateInstanceCountMap(Map<String, AtomicInteger> instanceCountMap) {
         for (Application app : this.getRegisteredApplications()) {
             for (InstanceInfo info : app.getInstancesAsIsFromEureka()) {
                 // 计数
                 AtomicInteger instanceCount = instanceCountMap.computeIfAbsent(info.getStatus().name(),
                         k -> new AtomicInteger(0));
                 instanceCount.incrementAndGet();
             }
         }
      }
      
      public List<Application> getRegisteredApplications() {
         return new ArrayList<Application>(this.applications);
      }
      
      // Applications.java
      public List<InstanceInfo> getInstancesAsIsFromEureka() {
         synchronized (instances) {
            return new ArrayList<InstanceInfo>(this.instances);
         }
      }
      

      • 计数那块代码,使用 Integer 即可,无需使用 AtomicInteger 。
  • 调用 #getReconcileHashCode() 方法,计算 hashcode 。实现代码如下:

    public static String getReconcileHashCode(Map<String, AtomicInteger> instanceCountMap) {
       StringBuilder reconcileHashCode = new StringBuilder(75);
       for (Map.Entry<String, AtomicInteger> mapEntry : instanceCountMap.entrySet()) {
           reconcileHashCode.append(mapEntry.getKey()).append(STATUS_DELIMITER) // status
                   .append(mapEntry.getValue().get()).append(STATUS_DELIMITER); // count
       }
       return reconcileHashCode.toString();
    }
    

2.2 合理性

本小节,建议你理解完全文后,再回到此处
本小节,建议你理解完全文后,再回到此处
本小节,建议你理解完全文后,再回到此处

笔者刚看完应用集合一致性哈希算法的计算公式,处于一脸懵逼的状态。这么精简的方式真的能够校验出数据的一致性么?不晓得有多少读者跟笔者有一样的疑惑。下面我们来论证该算法的合理性( 一本正经的胡说八道 )。

一致性哈希值通过状态 + 数量来计算,那么是不是可能状态总数是一样多,实际分布在不同的应用?那么我们列举模型如下:

UP
应用A m
应用B n

如果此时应用A 下线了 c 个原应用实例,应用B 注册了 c 个信应用实例,那么处于 UP 状态的数量仍然是 m + n 个。

  • 正常情况下,Eureka-Client 从 Eureka-Server 获取到完整的增量变化并合并,此时应用情况如下表格所示,两者是一致的,一致性哈希算法合理
UP (server) UP (client)
应用A m - c m - c
应用B n + c n + c
  • 异常情况下【1】,变更记录队列全部过期。那 Eureka-Client 从 Eureka-Server 获取到空的增量变化并合并,此时应用情况如下表格所示,两者应用是不相同的, 一致性哈希值却是相等的,一致性哈希算法不合理
UP (server) UP (client)
应用A m - c m
应用B n + c n
  • 异常情况下【2】,变更记录队列部分过期,例如应用A 和 应用B 都剩余 w 条变更记录。那 Eureka-Client 从 Eureka-Server 获取到部分的增量变化并合并,两者应用是不相同的,此时应用情况如下表格所示,一致性哈希值却是相等的,一致性哈希算法不合理
UP (server) UP (client)
应用A m - c m - w
应用B n + c n + w

What ? 从异常情况【1】【2】可以看到,一致性哈希算法竟然是不合理的,那么我们手动来做一次最精简的实验。实验如下:

  • 模拟场景:异常情况【1】,m = n = c = 1 。简单粗暴。
  • 特别配置
    • eureka.retentionTimeInMSInDeltaQueue = 1 ,变更记录队列每条记录存活时长 1 ms。用以实现 Eureka-Client 请求不到完整的增量变化。
    • eureka.deltaRetentionTimerIntervalInMs = 1 ,变更记录队列每条记录过期定时任务执行频率 1 ms。用以实现 Eureka-Client 请求不到完整的增量变化。
    • eureka.shouldUseReadOnlyResponseCache = false ,禁用响应缓存的只读缓存。用以避免等待缓存刷新。
    • eureka.waitTimeInMsWhenSyncEmpty = 1
  • 实验过程
    1. 00:00 启动 Eureka-Server
    2. 00:30 启动应用A ,向 Eureka-Server 注册
    3. 01:00 启动 Eureka-Client ,向 Eureka-Server 获取注册信息,等待获取到应用A
    4. 01:30 关闭应用A 。立即启动应用B ,向 Eureka-Server 注册
    5. 等待 5 分钟,Eureka-Client 无法获取到应用B
    6. 此时应用情况如下表格所示,两者应用是不相同的,一致性哈希值却是相等的,一致性哈希算法不合理。
UP (server) UP (client)
应用A 0 1
应用B 1 0

🙂结论🙂

当然排除掉特别极端的场景,Eureka-Client 从 Eureka-Server 因为网络异常导致一直同步不到增量变化,又恰好应用关闭和开启满足状态统计数量。另外,变更记录队列记录过期时长为 300 秒,增量获取频率为 30 秒,获取的次数有 10 次左右。所以,应用集合一致性哈希码在绝大多数场景是合理的笔者的YY,解决这个极小场景有如下方式:

  • 第一种,修改计算公式 appsHashCode = MD5(${app_name}_${instance_id}_${status}_${count}_) ,增加对应用名和应用实例编号敏感。
  • 第二种,每 N 分钟进行一次全量获取注册信息。

ps :笔者怀着忐忑的心写完了这个小节,如果有不合理的地方,又或者有不同观点的胖友,欢迎一起探讨。谢谢。

TODO[0027][反思]:应用集合一致性哈希算法。

3. Eureka-Client 发起增量获取

《Eureka 源码解析 —— 应用实例注册发现(六)之全量获取》「2.4 发起获取注册信息」 里,调用 DiscoveryClient#getAndUpdateDelta(...) 方法,增量获取注册信息,并刷新本地缓存,实现代码如下:

 1: private void getAndUpdateDelta(Applications applications) throws Throwable {
 2:     long currentUpdateGeneration = fetchRegistryGeneration.get();
 3: 
 4:     // 增量获取注册信息
 5:     Applications delta = null;
 6:     EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
 7:     if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
 8:         delta = httpResponse.getEntity();
 9:     }
10: 
11:     if (delta == null) {
12:         // 增量获取为空,全量获取
13:         logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
14:                 + "Hence got the full registry.");
15:         getAndStoreFullRegistry();
16:     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
17:         logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
18:         String reconcileHashCode = "";
19:         if (fetchRegistryUpdateLock.tryLock()) {
20:             try {
21:                 // 将变化的应用集合和本地缓存的应用集合进行合并
22:                 updateDelta(delta);
23:                 // 计算本地的应用集合一致性哈希码
24:                 reconcileHashCode = getReconcileHashCode(applications);
25:             } finally {
26:                 fetchRegistryUpdateLock.unlock();
27:             }
28:         } else {
29:             logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
30:         }
31:         // There is a diff in number of instances for some reason
32:         if (!reconcileHashCode.equals(delta.getAppsHashCode()) // 一致性哈希值不相等
33:                 || clientConfig.shouldLogDeltaDiff()) { //
34:             reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
35:         }
36:     } else {
37:         logger.warn("Not updating application delta as another thread is updating it already");
38:         logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
39:     }
40: }

  • 第 4 至 9 行 :请求增量获取注册信息,实现代码如下:

    // AbstractJerseyEurekaHttpClient.java
    @Override
    public EurekaHttpResponse<Applications> getDelta(String... regions) {
       return getApplicationsInternal("apps/delta", regions);
    }
    

    • 调用 AbstractJerseyEurekaHttpClient#getApplicationsInternal(...) 方法,GET 请求 Eureka-Server 的 apps/detla 接口,参数为 regions ,返回格式为 JSON ,实现增量获取注册信息。
  • 第 11 至 15 行 :增量获取失败,调用 #getAndStoreFullRegistry() 方法,全量获取注册信息,并设置到本地缓存。该方法在 《Eureka 源码解析 —— 应用实例注册发现(六)之全量获取》「2.4.1 全量获取注册信息,并设置到本地缓存」 有详细解析。

  • 第 16 至 35 行 :处理增量获取的结果。

    • 第 16 行 :TODO[0025] :并发更新的情况???
    • 第 19 行 :TODO[0025] :并发更新的情况???
    • 第 21 行 :调用 #updateDelta(...) 方法,将变化的应用集合和本地缓存的应用集合进行合并。
    • 第 31 至 35 行 :一致性哈希值不相等,调用 #reconcileAndLogDifference() 方法,全量获取注册信息,并设置到本地缓存,和 #getAndStoreFullRegistry() 基本类似。
      • 第 33 行 :配置 eureka.printDeltaFullDiff ,是否打印增量和全量差异。默认值 :false 。从目前代码实现上来看,暂时没有生效。注意 :开启该参数会导致每次增量获取后又发起全量获取,不要开启。

3.1 合并应用集合

调用 #updateDelta(...) 方法,将变化的应用集合和本地缓存的应用集合进行合并。实现代码如下:

 1: private void updateDelta(Applications delta) {
 2:     int deltaCount = 0;
 3:     for (Application app : delta.getRegisteredApplications()) { // 循环增量(变化)应用集合
 4:         for (InstanceInfo instance : app.getInstances()) {
 5:             Applications applications = getApplications();
 6:             // TODO[0009]:RemoteRegionRegistry
 7:             String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
 8:             if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
 9:                 Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
10:                 if (null == remoteApps) {
11:                     remoteApps = new Applications();
12:                     remoteRegionVsApps.put(instanceRegion, remoteApps);
13:                 }
14:                 applications = remoteApps;
15:             }
16: 
17:             ++deltaCount;
18:             if (ActionType.ADDED.equals(instance.getActionType())) { // 添加
19:                 Application existingApp = applications.getRegisteredApplications(instance.getAppName());
20:                 if (existingApp == null) {
21:                     applications.addApplication(app);
22:                 }
23:                 logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
24:                 applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
25:             } else if (ActionType.MODIFIED.equals(instance.getActionType())) { // 修改
26:                 Application existingApp = applications.getRegisteredApplications(instance.getAppName());
27:                 if (existingApp == null) {
28:                     applications.addApplication(app);
29:                 }
30:                 logger.debug("Modified instance {} to the existing apps ", instance.getId());
31: 
32:                 applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
33:             } else if (ActionType.DELETED.equals(instance.getActionType())) { // 删除
34:                 Application existingApp = applications.getRegisteredApplications(instance.getAppName());
35:                 if (existingApp == null) {
36:                     applications.addApplication(app);
37:                 }
38:                 logger.debug("Deleted instance {} to the existing apps ", instance.getId());
39:                 applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
40:             }
41:         }
42:     }
43:     logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
44: 
45:     getApplications().setVersion(delta.getVersion());
46:     // 过滤、打乱应用集合
47:     getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
48: 
49:     // TODO[0009]:RemoteRegionRegistry
50:     for (Applications applications : remoteRegionVsApps.values()) {
51:         applications.setVersion(delta.getVersion());
52:         applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
53:     }
54: }

  • 第 6 至 15 行 :TODO[0009]:RemoteRegionRegistry

  • 第 18 至 24 行 :添加( ADDED )应用实例时,调用 Application#addInstance(...) 方法,实现代码如下:

    // Application.java
    public void addInstance(InstanceInfo i) {
       // 添加到 应用实例映射
       instancesMap.put(i.getId(), i);
       synchronized (instances) {
           // 移除原有实例
           instances.remove(i);
           // 添加新实例
           instances.add(i);
           // 设置 isDirty ,目前只用于 `#toString()` 方法打印,无业务逻辑
           isDirty = true;
       }
    }
    
    // InstanceInfo.java
    @Override
    public int hashCode() { // 只使用 ID 计算 hashcode
       String id = getId();
       return (id == null) ? 31 : (id.hashCode() + 31);
    }
    
    @Override
    public boolean equals(Object obj) { // 只对比 ID
       if (this == obj) {
           return true;
       }
       if (obj == null) {
           return false;
       }
       if (getClass() != obj.getClass()) {
           return false;
       }
       InstanceInfo other = (InstanceInfo) obj;
       String id = getId();
       if (id == null) {
           if (other.getId() != null) {
               return false;
           }
       } else if (!id.equals(other.getId())) {
           return false;
       }
       return true;
    }
    

  • 第 25 至 32 行 :修改( MODIFIED )应用实例时,同样调用 Application#addInstance(...) 方法。

  • 第 33 至 40 行 :删除( DELETED )应用实例时,调用 Application#removeInstance(...) 方法,实现代码如下:

    public void removeInstance(InstanceInfo i) {
        removeInstance(i, true);
    }
    
    private void removeInstance(InstanceInfo i, boolean markAsDirty) {
        // 移除 应用实例映射
        instancesMap.remove(i.getId());
        synchronized (instances) {
            // 移除 应用实例
            instances.remove(i);
            if (markAsDirty) {
                // 设置 isDirty ,目前只用于 `#toString()` 方法打印,无业务逻辑
                isDirty = true;
            }
        }
    }
    

  • 第 47 行 :调用 Applications#shuffleInstances(...) 方法,根据配置 eureka.shouldFilterOnlyUpInstances = true ( 默认值 :true ) 过滤只保留状态为开启( UP )的应用实例,并随机打乱应用实例顺序。打乱后,实现调用应用服务的随机性。代码比较易懂,点击链接查看方法实现。

  • 第 49 至 53 行 :TODO[0009]:RemoteRegionRegistry

4. Eureka-Server 接收全量获取

3.1 接收全量获取请求

com.netflix.eureka.resources.ApplicationsResource,处理所有应用的请求操作的 Resource ( Controller )。

接收增量获取请求,映射 ApplicationsResource#getContainers() 方法。

3.2 最近租约变更记录队列

AbstractInstanceRegistry.recentlyChangedQueue,最近租约变更记录队列。实现代码如下:

// AbstractInstanceRegistry.java
/**
* 最近租约变更记录队列
*/
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();

/**
* 最近租约变更记录
*/
private static final class RecentlyChangedItem {
   /**
    * 最后更新时间戳
    */
   private long lastUpdateTime;
   /**
    * 租约
    */
   private Lease<InstanceInfo> leaseInfo;

   public RecentlyChangedItem(Lease<InstanceInfo> lease) {
       this.leaseInfo = lease;
       lastUpdateTime = System.currentTimeMillis();
   }

   public long getLastUpdateTime() {
       return this.lastUpdateTime;
   }

   public Lease<InstanceInfo> getLeaseInfo() {
       return this.leaseInfo;
   }
}

  • 当应用实例注册、下线、状态变更时,创建最近租约变更记录( RecentlyChangedItem ) 到队列。

  • 后台任务定时顺序扫描队列,当 lastUpdateTime 超过一定时长后进行移除。实现代码如下:

    // AbstractInstanceRegistry.java
    this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
                    serverConfig.getDeltaRetentionTimerIntervalInMs(),
                    serverConfig.getDeltaRetentionTimerIntervalInMs());
                    
    private TimerTask getDeltaRetentionTask() {
       return new TimerTask() {
    
           @Override
           public void run() {
               Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
               while (it.hasNext()) {
                   if (it.next().getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                       it.remove();
                   } else {
                       break;
                   }
               }
           }
    
       };
    }
    

    • 配置 eureka.deltaRetentionTimerIntervalInMs, 移除队列里过期的租约变更记录的定时任务执行频率,单位:毫秒。默认值 :30 * 1000 毫秒。
    • 配置 eureka.retentionTimeInMSInDeltaQueue,租约变更记录过期时长,单位:毫秒。默认值 : 3 * 60 * 1000 毫秒。

3.3 缓存读取

《Eureka 源码解析 —— 应用实例注册发现(六)之全量获取》「3.3 缓存读取」 里,在 #generatePayload() 方法里,调用 AbstractInstanceRegistry#getApplicationDeltas(...) 方法,获取近期变化的应用集合,实现代码如下:

// AbstractInstanceRegistry.java
  1: public Applications getApplicationDeltas() {
  2:     // 添加 增量获取次数 到 监控
  3:     GET_ALL_CACHE_MISS_DELTA.increment();
  4:     // 初始化 变化的应用集合
  5:     Applications apps = new Applications();
  6:     apps.setVersion(responseCache.getVersionDelta().get());
  7:     Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
  8:     try {
  9:         // 获取写锁
 10:         write.lock();
 11:         // 获取 最近租约变更记录队列
 12:         Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
 13:         logger.debug("The number of elements in the delta queue is :" + this.recentlyChangedQueue.size());
 14:         // 拼装 变化的应用集合
 15:         while (iter.hasNext()) {
 16:             Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
 17:             InstanceInfo instanceInfo = lease.getHolder();
 18:             Object[] args = {instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name()};
 19:             logger.debug("The instance id %s is found with status %s and actiontype %s", args);
 20:             Application app = applicationInstancesMap.get(instanceInfo.getAppName());
 21:             if (app == null) {
 22:                 app = new Application(instanceInfo.getAppName());
 23:                 applicationInstancesMap.put(instanceInfo.getAppName(), app);
 24:                 apps.addApplication(app);
 25:             }
 26:             app.addInstance(decorateInstanceInfo(lease));
 27:         }
 28: 
 29:         // TODO[0009]:RemoteRegionRegistry
 30:         boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
 31:         if (!disableTransparentFallback) {
 32:             Applications allAppsInLocalRegion = getApplications(false);
 33: 
 34:             for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
 35:                 Applications applications = remoteRegistry.getApplicationDeltas();
 36:                 for (Application application : applications.getRegisteredApplications()) {
 37:                     Application appInLocalRegistry =
 38:                             allAppsInLocalRegion.getRegisteredApplications(application.getName());
 39:                     if (appInLocalRegistry == null) {
 40:                         apps.addApplication(application);
 41:                     }
 42:                 }
 43:             }
 44:         }
 45: 
 46:         // 获取全量应用集合,通过它计算一致性哈希值
 47:         Applications allApps = getApplications(!disableTransparentFallback);
 48:         apps.setAppsHashCode(allApps.getReconcileHashCode());
 49:         return apps;
 50:     } finally {
 51:         write.unlock();
 52:     }
 53: }

666. 彩蛋

知识星球

在汉堡王写完这篇热情的博客。为什么用“热情”这个字眼呢?大夏天的,竟然不开空调的!对的,没有开空调,简直是个小火炉。恩,不过静心写完这篇文章,让我还是挺嗨皮的。

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?