配置中心 Apollo 源码解析 —— Client 轮询配置

1,699 阅读24分钟
原文链接: www.iocoder.cn
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 www.iocoder.cn/Apollo/clie… 「芋道源码」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

老艿艿:本系列假定胖友已经阅读过 《Apollo 官方 wiki 文档》

本文分享 Client 如何通过轮询的方式,从 Config Service 读取配置。Client 的轮询包括两部分:

  1. RemoteConfigRepository ,定时轮询 Config Service 的配置读取 /configs/{appId}/{clusterName}/{namespace:.+} 接口。
  2. RemoteConfigLongPollService ,轮询 Config Service 的配置变更通知 /notifications/v2 接口。

整体流程如下图:流程

  • 一个 Namespace 对应一个 RemoteConfigRepository 。
  • 多个 RemoteConfigRepository ,注册到全局唯一的 RemoteConfigLongPollService 中。

为什么是这样的设计呢?老艿艿请教了 Apollo 的作者宋老师。聊天内容如下,非常感谢:

  • 老艿艿:

    • github.com/ctripcorp/a…
    • 看这个 issue 问了
    • 问题:非常感谢。为什么不在 long polling 的返回结果中直接返回更新的结果呢?
    • 回答:这样推送消息就是有状态了,做不到幂等了,会带来很多问题。
    • [呲牙] 周末打扰哈。带来的问题主要是哪些哈?
  • 张大佬:

    • 可能会有数据的丢失,如果使用只推送变更通知的话,即使丢失了,还是能在下一次变更的时候达到一个一致的状态
  • 宋老师:

    • @老艿艿 主要是幂等考虑
    • 加载配置接口是幂等的,推送配置的话就做不到了,因为和推送顺序相关
  • 老艿艿:

    • 推送顺序指的是?
  • 张大佬:

    • 我想的是,比如两次修改操作的通知,由于网络问题,客户端接收到的顺序可能是反的,如果这两次修改的是同一个key,就有问题了。
  • 老艿艿:

    • 从目前代码看下来,长轮询发起,同时只会有一个,应该不会同时两个通知呀。
    • 现在 client 是一个长轮询+定时轮询,是不是这两个会有互相的影响。
  • 张大佬:

    • @宋老师 嗯嗯
  • 宋老师:

    • 目前推送是单连接走http的,所以问题可能不大,不过在设计上而言是有这个问题的,比如如果推送是走的tcp长连接的话。另外,长轮询和推送之间也会有冲突,如果连续两次配置变化,就可能造成双写。还有一点,就是保持逻辑的简单,目前的做法推送只负责做简单的通知,不需要去计算客户端的配置应该是什么,因为计算逻辑挺复杂的,需要考虑集群,关联,灰度等。
  • 老艿艿:

    • 现在的设计思路上,是不是可以理解.
    • 1、client 的定时轮询,可以保持最终一致。
    • 2、client 的长轮询,是定时轮训的“实时”补充,通过这样的方式,让流程统一?
  • 宋老师:

    • 总而言之,就是在满足幂等性,实时性的基础上保持设计的简单
    • 是的,推拉结合
  • 张大佬:

    • 长轮询个推送直接的冲突没太理解
    • 有没有可能有这种问题,一次长轮询中消息丢失了,但是长轮询还在
  • 老艿艿:

      1. 长轮询的通知里面,带有配置信息
      1. 定时轮训,里面也拿到配置信息
    • 这个时候,client 是没办法判断哪个配置信息是新的。
    • @张大佬 我认为是可能的,这个时候,client 可以发起新的长轮询。
  • 宋大佬:

    • @张大佬 长轮询和推送的冲突,这个更正为定时轮询和推送的冲突
  • 老艿艿:

    • 通知是定时轮询配置的补充。有了通知,立马轮询。不用在定时了
  • 张大佬:

    • get
  • 老艿艿:

    • 谢谢宋老师和张大佬[坏笑][坏笑]

2. ConfigRepository

com.ctrip.framework.apollo.internals.ConfigRepository ,配置 Repository 接口。代码如下:

public interface ConfigRepository {

    /**
     * Get the config from this repository.
     *
     * @return config
     */
    Properties getConfig();

    /**
     * Set the fallback repo for this repository.
     *
     * @param upstreamConfigRepository the upstream repo
     */
    void setUpstreamRepository(ConfigRepository upstreamConfigRepository);

    /**
     * Add change listener.
     *
     * @param listener the listener to observe the changes
     */
    void addChangeListener(RepositoryChangeListener listener);

    /**
     * Remove change listener.
     *
     * @param listener the listener to remove
     */
    void removeChangeListener(RepositoryChangeListener listener);

}
  • ConfigRepository ,作为 Client 的 Repository ( 类似 DAO ) ,读取配置。
  • #getConfig() 方法,读取配置。
  • #setUpstreamRepository(ConfigRepository) 方法,设置上游的 Repository 。主要用于 LocalFileConfigRepository ,从 Config Service 读取配置,缓存在本地文件。
  • RepositoryChangeListener ,监听 Repository 的配置的变化。🙂
    • #addChangeListener(RepositoryChangeListener)
    • #removeChangeListener(RepositoryChangeListener)

子类如下图所示:类图

  • 本文不分享 LocalFileConfigRepository 。

2.1 AbstractConfigRepository

com.ctrip.framework.apollo.internals.AbstractConfigRepository ,实现 ConfigRepository 接口,配置 Repository 抽象类

2.1.1 同步配置

/**
 * 尝试同步
 *
 * @return 是否同步成功
 */
protected boolean trySync() {
    try {
        // 同步
        sync();
        // 返回同步成功
        return true;
    } catch (Throwable ex) {
        // 【TODO 6001】Tracer 日志
        Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
        logger.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil.getDetailMessage(ex));
    }
    // 返回同步失败
    return false;
}

/**
 * 同步配置
 */
protected abstract void sync();
  • #sync() 抽象方法,同步配置。

2.1.2 监听器

/**
 * RepositoryChangeListener 数组
 */
private List<RepositoryChangeListener> m_listeners = Lists.newCopyOnWriteArrayList();

@Override
public void addChangeListener(RepositoryChangeListener listener) {
    if (!m_listeners.contains(listener)) {
        m_listeners.add(listener);
    }
}

@Override
public void removeChangeListener(RepositoryChangeListener listener) {
    m_listeners.remove(listener);
}

/**
 * 触发监听器们
 *
 * @param namespace Namespace 名字
 * @param newProperties 配置
 */
protected void fireRepositoryChange(String namespace, Properties newProperties) {
    // 循环 RepositoryChangeListener 数组
    for (RepositoryChangeListener listener : m_listeners) {
        try {
            // 触发监听器
            listener.onRepositoryChange(namespace, newProperties);
        } catch (Throwable ex) {
            // 【TODO 6001】Tracer 日志
            Tracer.logError(ex);
            logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
        }
    }
}

2.2 RemoteConfigRepository

com.ctrip.framework.apollo.internals.RemoteConfigRepository ,实现 AbstractConfigRepository 抽象类,远程配置 Repository 。实现从 Config Service 拉取配置,并缓存在内存中。并且,定时 + 实时刷新缓存。

2.2.1 构造方法

private static final Logger logger = LoggerFactory.getLogger(RemoteConfigRepository.class);
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&").withKeyValueSeparator("=");

private static final Escaper pathEscaper = UrlEscapers.urlPathSegmentEscaper();
private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper();

/**
 * 远程配置长轮询服务
 */
private RemoteConfigLongPollService remoteConfigLongPollService;
/**
 * 指向 ApolloConfig 的 AtomicReference ,缓存配置
 */
private volatile AtomicReference<ApolloConfig> m_configCache;
/**
 * Namespace 名字
 */
private final String m_namespace;
/**
 * ScheduledExecutorService 对象
 */
private final static ScheduledExecutorService m_executorService;
/**
 * 指向 ServiceDTO( Config Service 信息) 的 AtomicReference
 */
private AtomicReference<ServiceDTO> m_longPollServiceDto;
/**
 * 指向 ApolloNotificationMessages 的 AtomicReference
 */
private AtomicReference<ApolloNotificationMessages> m_remoteMessages;
/**
 * 加载配置的 RateLimiter
 */
private RateLimiter m_loadConfigRateLimiter;
/**
 * 是否强制拉取缓存的标记
 *
 * 若为 true ,则多一轮从 Config Service 拉取配置
 * 为 true 的原因,RemoteConfigRepository 知道 Config Service 有配置刷新
 */
private AtomicBoolean m_configNeedForceRefresh;
/**
 * 失败定时重试策略,使用 {@link ExponentialSchedulePolicy}
 */
private SchedulePolicy m_loadConfigFailSchedulePolicy;
private Gson gson;
private ConfigUtil m_configUtil;
private HttpUtil m_httpUtil;
private ConfigServiceLocator m_serviceLocator;

static {
    // 单线程池
    m_executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("RemoteConfigRepository", true));
}

/**
 * Constructor.
 *
 * @param namespace the namespace
 */
public RemoteConfigRepository(String namespace) {
    m_namespace = namespace;
    m_configCache = new AtomicReference<>();
    m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
    m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
    remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
    m_longPollServiceDto = new AtomicReference<>();
    m_remoteMessages = new AtomicReference<>();
    m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
    m_configNeedForceRefresh = new AtomicBoolean(true);
    m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(), m_configUtil.getOnErrorRetryInterval() * 8);
    gson = new Gson();
    // 尝试同步配置
    this.trySync();
    // 初始化定时刷新配置的任务
    this.schedulePeriodicRefresh();
    // 注册自己到 RemoteConfigLongPollService 中,实现配置更新的实时通知
    this.scheduleLongPollingRefresh();
}
  • 基础属性
    • m_namespace 属性,Namespace 名字。一个 RemoteConfigRepository 对应一个 Namespace 。
    • m_configCache 属性,指向 ApolloConfig 的 AtomicReference ,缓存配置
  • 轮询属性
    • m_remoteMessages 属性,指向 ApolloNotificationMessages 的 AtomicReference 。
    • m_executorService 属性,ScheduledExecutorService 对象,线程大小为 1
    • m_loadConfigRateLimiter 属性,加载配置的 RateLimiter 。
    • m_loadConfigFailSchedulePolicy ,失败定时重试策略,使用 ExponentialSchedulePolicy 实现类,区间范围是 [1, 8] 秒。详细解析,见 「4. SchedulePolicy」
  • 通知属性
    • remoteConfigLongPollService 属性,远程配置长轮询服务。
    • m_longPollServiceDto 属性,长轮询到通知的 Config Service 信息。在下一次轮询配置时,优先从该 Config Service 请求。
    • m_configNeedForceRefresh 属性,是否强制拉取缓存的标记。
      • 若为 true ,则多一轮从 Config Service 拉取配置。
      • true 的原因,RemoteConfigRepository 知道 Config Service 有配置刷新,例如有新的通知的情况下。
      • 比较绕,下面看了代码会更加好理解。
  • 构造方法

    • 调用 #trySync() 方法,尝试同步配置,作为初次的配置缓存初始化。
    • 调用 #schedulePeriodicRefresh() 方法,初始化定时刷新配置的任务。代码如下:

      private void schedulePeriodicRefresh() {
          logger.debug("Schedule periodic refresh with interval: {} {}", m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
          // 创建定时任务,定时刷新配置
          m_executorService.scheduleAtFixedRate(new Runnable() {
              @Override
              public void run() {
                  // 【TODO 6001】Tracer 日志
                  Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
                  logger.debug("refresh config for namespace: {}", m_namespace);
                  // 尝试同步配置
                  trySync();
                  // 【TODO 6001】Tracer 日志
                  Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
              }
          }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
      }
      
      • 5 分钟,调用 #trySync() 方法,同步配置。
    • 调用 #scheduleLongPollingRefresh() 方法,将自己注册到 RemoteConfigLongPollService 中,实现配置更新的实时通知。代码如下:

      private void scheduleLongPollingRefresh() {
          remoteConfigLongPollService.submit(m_namespace, this);
      }
      
      • 当 RemoteConfigLongPollService 长轮询到该 RemoteConfigRepository 的 Namespace 下的配置更新时,会回调 #onLongPollNotified(ServiceDTO, ApolloNotificationMessages) 方法,在 「2.2.4 onLongPollNotified」 中,详细解析。

2.2.2 getConfigServices

#getConfigServices() 方法,获得所有 Config Service 信息。代码如下:

private List<ServiceDTO> getConfigServices() {
    List<ServiceDTO> services = m_serviceLocator.getConfigServices();
    if (services.size() == 0) {
        throw new ApolloConfigException("No available config service");
    }
    return services;
}
  • 通过 ConfigServiceLocator ,可获得 Config Service 集群的地址们。在后续和注册发现相关的文章,详细解析 ConfigServiceLocator 。这里,我们只需要有这么一回事即可。

2.2.3 assembleQueryConfigUrl

#assembleQueryConfigUrl() 方法,组装轮询 Config Service 的配置读取 /configs/{appId}/{clusterName}/{namespace:.+} 接口的 URL ,代码如下:

String assembleQueryConfigUrl(String uri, String appId, String cluster, String namespace,
                              String dataCenter, ApolloNotificationMessages remoteMessages, ApolloConfig previousConfig) {
    String path = "configs/%s/%s/%s"; // /configs/{appId}/{clusterName}/{namespace:.+}
    List<String> pathParams = Lists.newArrayList(pathEscaper.escape(appId), pathEscaper.escape(cluster), pathEscaper.escape(namespace));
    Map<String, String> queryParams = Maps.newHashMap();
    // releaseKey
    if (previousConfig != null) {
        queryParams.put("releaseKey", queryParamEscaper.escape(previousConfig.getReleaseKey()));
    }
    // dataCenter
    if (!Strings.isNullOrEmpty(dataCenter)) {
        queryParams.put("dataCenter", queryParamEscaper.escape(dataCenter));
    }
    // ip
    String localIp = m_configUtil.getLocalIp();
    if (!Strings.isNullOrEmpty(localIp)) {
        queryParams.put("ip", queryParamEscaper.escape(localIp));
    }
    // messages
    if (remoteMessages != null) {
        queryParams.put("messages", queryParamEscaper.escape(gson.toJson(remoteMessages)));
    }
    // 格式化 URL
    String pathExpanded = String.format(path, pathParams.toArray());
    // 拼接 Query String
    if (!queryParams.isEmpty()) {
        pathExpanded += "?" + MAP_JOINER.join(queryParams);
    }
    // 拼接最终的请求 URL
    if (!uri.endsWith("/")) {
        uri += "/";
    }
    return uri + pathExpanded;
}

###2.2.4 onLongPollNotified

#onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) 方法,当长轮询到配置更新时,发起同步配置的任务。代码如下:

 1: public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
 2:     // 设置长轮询到配置更新的 Config Service 。下次同步配置时,优先读取该服务
 3:     m_longPollServiceDto.set(longPollNotifiedServiceDto);
 4:     // 设置 m_remoteMessages
 5:     m_remoteMessages.set(remoteMessages);
 6:     // 提交同步任务
 7:     m_executorService.submit(new Runnable() {
 8:
 9:         @Override
10:         public void run() {
11:             // 设置 m_configNeedForceRefresh 为 true
12:             m_configNeedForceRefresh.set(true);
13:             // 尝试同步配置
14:             trySync();
15:         }
16:
17:     });
18: }
  • 第 3 行:设置长轮询到配置更新的 Config Service 。下次同步配置时,优先读取该服务。
  • 第 5 行:设置 m_remoteMessages
  • 第 6 至 17 行:提交配置同步任务。
    • 第 12 行:设置 m_configNeedForceRefreshtrue
    • 第 14 行:调用 #trySync() 方法,同步配置。

2.2.5 sync

#sync() 实现方法,从 Config Service 同步配置。代码如下:

 1: @Override
 2: protected synchronized void sync() {
 3:     // 【TODO 6001】Tracer 日志
 4:     Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
 5:     try {
 6:         // 获得缓存的 ApolloConfig 对象
 7:         ApolloConfig previous = m_configCache.get();
 8:         // 从 Config Service 加载 ApolloConfig 对象
 9:         ApolloConfig current = loadApolloConfig();
10:
11:         // reference equals means HTTP 304
12:         // 若不相等,说明更新了,设置到缓存中
13:         if (previous != current) {
14:             logger.debug("Remote Config refreshed!");
15:             // 设置到缓存
16:             m_configCache.set(current);
17:             // 发布 Repository 的配置发生变化,触发对应的监听器们
18:             super.fireRepositoryChange(m_namespace, this.getConfig());
19:         }
20:         // 【TODO 6001】Tracer 日志
21:         if (current != null) {
22:             Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()), current.getReleaseKey());
23:         }
24:         // 【TODO 6001】Tracer 日志
25:         transaction.setStatus(Transaction.SUCCESS);
26:     } catch (Throwable ex) {
27:         // 【TODO 6001】Tracer 日志
28:         transaction.setStatus(ex);
29:         throw ex;
30:     } finally {
31:         // 【TODO 6001】Tracer 日志
32:         transaction.complete();
33:     }
34: }
  • 第 7 行:获得缓存 m_configCache 的 ApolloConfig 对象。
  • 第 9 行:调用 #loadApolloConfig() 方法,从 Config Service 加载 ApolloConfig 对象。详细解析,见
  • 第 13 至 19 行:若缓存的和加载的 ApolloConfig 对象不同,说明更新了,设置缓存中。
    • 第 16 行:设置加载到的 ApolloConfig 到缓存 m_configCache 中。
    • 第 18 行:调用 #fireRepositoryChange(m_namespace, ApolloConfig) 方法,发布 Repository 的配置发生变化,触发对应的监听器们。。详细解析,见 「2.2.6 loadApolloConfig」

2.2.6 loadApolloConfig

#loadApolloConfig() 方法,从 Config Service 加载 ApolloConfig 对象。代码如下:

  1: private ApolloConfig loadApolloConfig() {
  2:     // 限流
  3:     if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
  4:         // wait at most 5 seconds
  5:         try {
  6:             TimeUnit.SECONDS.sleep(5);
  7:         } catch (InterruptedException e) {
  8:         }
  9:     }
 10:     // 获得 appId cluster dataCenter 配置信息
 11:     String appId = m_configUtil.getAppId();
 12:     String cluster = m_configUtil.getCluster();
 13:     String dataCenter = m_configUtil.getDataCenter();
 14:     Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
 15:     // 计算重试次数
 16:     int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
 17:     long onErrorSleepTime = 0; // 0 means no sleep
 18:     Throwable exception = null;
 19:     // 获得所有的 Config Service 的地址
 20:     List<ServiceDTO> configServices = getConfigServices();
 21:     String url = null;
 22:     // 循环读取配置重试次数直到成功。每一次,都会循环所有的 ServiceDTO 数组。
 23:     for (int i = 0; i < maxRetries; i++) {
 24:         // 随机所有的 Config Service 的地址
 25:         List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
 26:         Collections.shuffle(randomConfigServices);
 27:         // 优先访问通知配置变更的 Config Service 的地址。并且,获取到时,需要置空,避免重复优先访问。
 28:         // Access the server which notifies the client first
 29:         if (m_longPollServiceDto.get() != null) {
 30:             randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
 31:         }
 32:         // 循环所有的 Config Service 的地址
 33:         for (ServiceDTO configService : randomConfigServices) {
 34:             // sleep 等待,下次从 Config Service 拉取配置
 35:             if (onErrorSleepTime > 0) {
 36:                 logger.warn("Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}", onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
 37:                 try {
 38:                     m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
 39:                 } catch (InterruptedException e) {
 40:                     //ignore
 41:                 }
 42:             }
 43:             // 组装查询配置的地址
 44:             url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace, dataCenter, m_remoteMessages.get(), m_configCache.get());
 45:
 46:             logger.debug("Loading config from {}", url);
 47:             // 创建 HttpRequest 对象
 48:             HttpRequest request = new HttpRequest(url);
 49:
 50:             // 【TODO 6001】Tracer 日志
 51:             Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
 52:             transaction.addData("Url", url);
 53:             try {
 54:                 // 发起请求,返回 HttpResponse 对象
 55:                 HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
 56:                 // 设置 m_configNeedForceRefresh = false
 57:                 m_configNeedForceRefresh.set(false);
 58:                 // 标记成功
 59:                 m_loadConfigFailSchedulePolicy.success();
 60:
 61:                 // 【TODO 6001】Tracer 日志
 62:                 transaction.addData("StatusCode", response.getStatusCode());
 63:                 transaction.setStatus(Transaction.SUCCESS);
 64:
 65:                 // 无新的配置,直接返回缓存的 ApolloConfig 对象
 66:                 if (response.getStatusCode() == 304) {
 67:                     logger.debug("Config server responds with 304 HTTP status code.");
 68:                     return m_configCache.get();
 69:                 }
 70:
 71:                 // 有新的配置,进行返回新的 ApolloConfig 对象
 72:                 ApolloConfig result = response.getBody();
 73:                 logger.debug("Loaded config for {}: {}", m_namespace, result);
 74:                 return result;
 75:             } catch (ApolloConfigStatusCodeException ex) {
 76:                 ApolloConfigStatusCodeException statusCodeException = ex;
 77:                 // 若返回的状态码是 404 ,说明查询配置的 Config Service 不存在该 Namespace 。
 78:                 // config not found
 79:                 if (ex.getStatusCode() == 404) {
 80:                     String message = String.format("Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
 81:                                     "please check whether the configs are released in Apollo!", appId, cluster, m_namespace);
 82:                     statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(), message);
 83:                 }
 84:                 // 【TODO 6001】Tracer 日志
 85:                 Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
 86:                 transaction.setStatus(statusCodeException);
 87:                 // 设置最终的异常
 88:                 exception = statusCodeException;
 89:             } catch (Throwable ex) {
 90:                 // 【TODO 6001】Tracer 日志
 91:                 Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
 92:                 transaction.setStatus(ex);
 93:                 // 设置最终的异常
 94:                 exception = ex;
 95:             } finally {
 96:                 // 【TODO 6001】Tracer 日志
 97:                 transaction.complete();
 98:             }
 99:             // 计算延迟时间
100:             // if force refresh, do normal sleep, if normal config load, do exponential sleep
101:             onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() : m_loadConfigFailSchedulePolicy.fail();
102:         }
103:
104:     }
105:     // 若查询配置失败,抛出 ApolloConfigException 异常
106:     String message = String.format("Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s", appId, cluster, m_namespace, url);
107:     throw new ApolloConfigException(message, exception);
108: }
  • 第 3 至 9 行:调用 RateLimiter#tryAcquire(long timeout, TimeUnit unit) 方法,判断是否被限流。若限流,sleep 5 秒,避免对 Config Service 请求过于频繁。
  • 第 10 至 13 行:获得 appId cluster dataCenter 配置信息。
  • 第 16 行:计算重试次数。若 m_configNeedForceRefreshtrue ,代表强制刷新配置,会多重试一次。
  • 第 20 行:调用 #getConfigServices() 方法,获得所有的 Config Service 的地址。
  • ========== 第一层循环 ==========
  • 第 23 至 104 行:循环读取配置重试次数直到成功。每一次,都会循环所有的 ServiceDTO 数组
  • 第 24 至 26 行:创建新的 Config Service 数组,并随机打乱。因为【第 29 至 31 行】可能会添加新的 Config Service 元素,如果不创建新的数组,会修改了原有数组。
  • 第 27 至 31 行:若 m_longPollServiceDto 非空,优先访问通知配置变更的 Config Service 的地址。并且,获取到时,需要置空,避免重复优先访问。
  • ========== 第二层循环 ==========
  • 第 33 至 102 行:循环所有的 Config Service 的地址,读取配置重试次数直到成功。
  • 第 34 至 42 行:若 onErrorSleepTime 大于零,sleep 等待,下次从 Config Service 拉取配置。在【第 101 行】,若请求失败一次 Config Service 时,会计算一次下一次请求的延迟时间。因为是每次请求失败一次 Config Service 时就计算一次,所以延迟时间的上限为 8 秒,比较短。
  • 第 44 行:调用 #assembleQueryConfigUrl(...) 方法,组装查询配置的地址。
  • 第 48 行:创建 HttpRequest 对象。
  • 第 55 行:调用 HttpUtil#doGet(request, Class) 方法,发起请求,返回 HttpResponse 对象。
  • 第 57 行:设置 m_configNeedForceRefreshfalse
  • 第 59 行:调用 SchedulePolicy#success() 方法,标记成功。
  • 第 65 至 69 行:若返回的状态码是 304新的配置,直接返回缓存的 ApolloConfig 对象。
  • 第 71 至 74 行:新的配置,创建 ApolloConfig 对象,并返回。
  • 第 75 至 94 行:异常相关的处理,胖友自己看注释。
  • 第 101 行:计算延迟时间。若 m_configNeedForceRefresh 为:
    • true 时,调用 ConfigUtil#getOnErrorRetryInterval() 方法,返回 2 。因为已经知道有配置更新,所以减短重试间隔。
    • false 时,调用 SchedulePolicy#fail() 方法,计算下次重试延迟时间。
  • ========== 最外层 ==========
  • 第 105 至 107 行:若查询配置失败,抛出 ApolloConfigException 异常。

2.2.7 getConfig

#getConfig() 实现方法,获得配置。代码如下:

1: @Override
2: public Properties getConfig() {
3:     // 如果缓存为空,强制从 Config Service 拉取配置
4:     if (m_configCache.get() == null) {
5:         this.sync();
6:     }
7:     // 转换成 Properties 对象,并返回
8:     return transformApolloConfigToProperties(m_configCache.get());
9: }
  • 第 3 至 6 行:若果缓存为空,调用 #sync() 方法,强制从 Config Service 拉取配置。
  • 第 8 行:调用 #transformApolloConfigToProperties(ApolloConfig) 对象,转换成 Properties 对象,并返回。代码如下:

    private Properties transformApolloConfigToProperties(ApolloConfig apolloConfig) {
        Properties result = new Properties();
        result.putAll(apolloConfig.getConfigurations());
        return result;
    }
    

3. RemoteConfigLongPollService

com.ctrip.framework.apollo.internals.RemoteConfigLongPollService ,远程配置长轮询服务。负责长轮询 Config Service 的配置变更通知 /notifications/v2 接口。当有新的通知时,触发 RemoteConfigRepository ,立即轮询 Config Service 的配置读取 /configs/{appId}/{clusterName}/{namespace:.+} 接口。

3.1 构造方法

private static final Logger logger = LoggerFactory.getLogger(RemoteConfigLongPollService.class);

private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&").withKeyValueSeparator("=");
private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper();

private static final long INIT_NOTIFICATION_ID = ConfigConsts.NOTIFICATION_ID_PLACEHOLDER;
// 90 seconds, should be longer than server side's long polling timeout, which is now 60 seconds
private static final int LONG_POLLING_READ_TIMEOUT = 90 * 1000;

/**
 * 长轮询 ExecutorService
 */
private final ExecutorService m_longPollingService;
/**
 * 是否停止长轮询的标识
 */
private final AtomicBoolean m_longPollingStopped;
/**
 * 失败定时重试策略,使用 {@link ExponentialSchedulePolicy}
 */
private SchedulePolicy m_longPollFailSchedulePolicyInSecond;
/**
 * 长轮询的 RateLimiter
 */
private RateLimiter m_longPollRateLimiter;
/**
 * 是否长轮询已经开始的标识
 */
private final AtomicBoolean m_longPollStarted;
/**
 * 长轮询的 Namespace Multimap 缓存
 *
 * 通过 {@link #submit(String, RemoteConfigRepository)} 添加 RemoteConfigRepository 。
 *
 * KEY:Namespace 的名字
 * VALUE:RemoteConfigRepository 集合
 */
private final Multimap<String, RemoteConfigRepository> m_longPollNamespaces;
/**
 * 通知编号 Map 缓存
 *
 * KEY:Namespace 的名字
 * VALUE:最新的通知编号
 */
private final ConcurrentMap<String, Long> m_notifications;
/**
 * 通知消息 Map 缓存
 *
 * KEY:Namespace 的名字
 * VALUE:ApolloNotificationMessages 对象
 */
private final Map<String, ApolloNotificationMessages> m_remoteNotificationMessages;//namespaceName -> watchedKey -> notificationId
private Type m_responseType;
private Gson gson;
private ConfigUtil m_configUtil;
private HttpUtil m_httpUtil;
private ConfigServiceLocator m_serviceLocator;

public RemoteConfigLongPollService() {
    m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
    m_longPollingStopped = new AtomicBoolean(false);
    m_longPollingService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("RemoteConfigLongPollService", true));
    m_longPollStarted = new AtomicBoolean(false);
    m_longPollNamespaces = Multimaps.synchronizedSetMultimap(HashMultimap.<String, RemoteConfigRepository>create());
    m_notifications = Maps.newConcurrentMap();
    m_remoteNotificationMessages = Maps.newConcurrentMap();
    m_responseType = new TypeToken<List<ApolloConfigNotification>>() {}.getType();
    gson = new Gson();
    m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
    m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
    m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS());
}
  • 基础属性
    • m_longPollNamespaces 属性,注册的长轮询的 Namespace Multimap 缓存。
    • m_notifications 属性,通知编号 Map 缓存。
    • m_remoteNotificationMessages 属性,通知消息 Map 缓存。
  • 轮询属性
    • m_longPollingService 属性,长轮询 ExecutorService ,线程大小为 1
    • m_longPollingStopped 属性,是否停止长轮询的标识。
    • m_longPollStarted 属性,是否长轮询已经开始的标识。
    • m_loadConfigRateLimiter 属性,加载配置的 RateLimiter 。
    • m_longPollFailSchedulePolicyInSecond ,失败定时重试策略,使用 ExponentialSchedulePolicy 实现类,区间范围是 [1, 120] 秒。详细解析,见 「4. SchedulePolicy」

3.2 getConfigServices

「2.2 getConfigServices」 的代码。

3.3 assembleLongPollRefreshUrl

#assembleLongPollRefreshUrl(...) 方法,轮询 Config Service 的配置变更通知 /notifications/v2 接口的 URL ,代码如下:

String assembleLongPollRefreshUrl(String uri, String appId, String cluster, String dataCenter, Map<String, Long> notificationsMap) {
    Map<String, String> queryParams = Maps.newHashMap();
    queryParams.put("appId", queryParamEscaper.escape(appId));
    queryParams.put("cluster", queryParamEscaper.escape(cluster));
    // notifications
    queryParams.put("notifications", queryParamEscaper.escape(assembleNotifications(notificationsMap)));
    // dataCenter
    if (!Strings.isNullOrEmpty(dataCenter)) {
        queryParams.put("dataCenter", queryParamEscaper.escape(dataCenter));
    }
    // ip
    String localIp = m_configUtil.getLocalIp();
    if (!Strings.isNullOrEmpty(localIp)) {
        queryParams.put("ip", queryParamEscaper.escape(localIp));
    }
    // 创建 Query String
    String params = MAP_JOINER.join(queryParams);
    // 拼接 URL
    if (!uri.endsWith("/")) {
        uri += "/";
    }
    return uri + "notifications/v2?" + params;
}

String assembleNotifications(Map<String, Long> notificationsMap) {
    // 创建 ApolloConfigNotification 数组
    List<ApolloConfigNotification> notifications = Lists.newArrayList();
    // 循环,添加 ApolloConfigNotification 对象
    for (Map.Entry<String, Long> entry : notificationsMap.entrySet()) {
        ApolloConfigNotification notification = new ApolloConfigNotification(entry.getKey(), entry.getValue());
        notifications.add(notification);
    }
    // JSON 化成字符串
    return gson.toJson(notifications);
}

3.4 submit

#submit(namespace, RemoteConfigRepository) 方法,提交 RemoteConfigRepository 到长轮询任务。代码如下:

 1: public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
 2:     // 添加到 m_longPollNamespaces 中
 3:     boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
 4:     // 添加到 m_notifications 中
 5:     m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
 6:     // 若未启动长轮询定时任务,进行启动
 7:     if (!m_longPollStarted.get()) {
 8:         startLongPolling();
 9:     }
10:     return added;
11: }
  • 第 3 行:添加到 m_longPollNamespaces.put 中。
  • 第 5 行:添加到 m_notifications 中。
  • 第 6 至 9 行:若未启动长轮询定时任务,调用 #startLongPolling() 方法,进行启动。

3.5 startLongPolling

#startLongPolling() 方法,启动长轮询任务。代码如下:

 1: private void startLongPolling() {
 2:     // CAS 设置长轮询任务已经启动。若已经启动,不重复启动。
 3:     if (!m_longPollStarted.compareAndSet(false, true)) {
 4:         //already started
 5:         return;
 6:     }
 7:     try {
 8:         // 获得 appId cluster dataCenter 配置信息
 9:         final String appId = m_configUtil.getAppId();
10:         final String cluster = m_configUtil.getCluster();
11:         final String dataCenter = m_configUtil.getDataCenter();
12:         // 获得长轮询任务的初始化延迟时间,单位毫秒。
13:         final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
14:         // 提交长轮询任务。该任务会持续且循环执行。
15:         m_longPollingService.submit(new Runnable() {
16:             @Override
17:             public void run() {
18:                 // 初始等待
19:                 if (longPollingInitialDelayInMills > 0) {
20:                     try {
21:                         logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
22:                         TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
23:                     } catch (InterruptedException e) {
24:                         //ignore
25:                     }
26:                 }
27:                 // 执行长轮询
28:                 doLongPollingRefresh(appId, cluster, dataCenter);
29:             }
30:         });
31:     } catch (Throwable ex) {
32:         // 设置 m_longPollStarted 为 false
33:         m_longPollStarted.set(false);
34:         // 【TODO 6001】Tracer 日志
35:         ApolloConfigException exception = new ApolloConfigException("Schedule long polling refresh failed", ex);
36:         Tracer.logError(exception);
37:         logger.warn(ExceptionUtil.getDetailMessage(exception));
38:     }
39: }
  • 第 2 至 6 行:CAS 设置长轮询任务已经启动。若已经启动,不重复启动。
  • 第 8 至 11 行:获得 appId cluster dataCenter 配置信息。
  • 第 13 行:调用 ConfigUtil#getLongPollingInitialDelayInMills() 方法,获得长轮询任务的初始化延迟时间,单位毫秒。默认,2000 毫秒。
  • 第 14 至 30 行:提交长轮询任务。该任务会持续且循环执行。
    • 第 18 至 26 行:sleep ,初始等待。
    • 第 28 行:调用 #doLongPollingRefresh(appId, cluster, dataCenter) 方法,执行长轮询任务。
  • 第 31 至 38 行:初始化失败的异常处理,胖友自己看代码注释。

3.6 doLongPollingRefresh

#doLongPollingRefresh() 方法,持续执行长轮询。代码如下:

 1: private void doLongPollingRefresh(String appId, String cluster, String dataCenter) {
 2:     final Random random = new Random();
 3:     ServiceDTO lastServiceDto = null;
 4:     // 循环执行,直到停止或线程中断
 5:     while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
 6:         // 限流
 7:         if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
 8:             // wait at most 5 seconds
 9:             try {
10:                 TimeUnit.SECONDS.sleep(5);
11:             } catch (InterruptedException e) {
12:             }
13:         }
14:         // 【TODO 6001】Tracer 日志
15:         Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
16:         String url = null;
17:         try {
18:             // 获得 Config Service 的地址
19:             if (lastServiceDto == null) {
20:                 // 获得所有的 Config Service 的地址
21:                 List<ServiceDTO> configServices = getConfigServices();
22:                 lastServiceDto = configServices.get(random.nextInt(configServices.size()));
23:             }
24:             // 组装长轮询通知变更的地址
25:             url = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter, m_notifications);
26: 
27:             logger.debug("Long polling from {}", url);
28:             // 创建 HttpRequest 对象,并设置超时时间
29:             HttpRequest request = new HttpRequest(url);
30:             request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
31: 
32:             // 【TODO 6001】Tracer 日志
33:             transaction.addData("Url", url);
34: 
35:             // 发起请求,返回 HttpResponse 对象
36:             final HttpResponse<List<ApolloConfigNotification>> response = m_httpUtil.doGet(request, m_responseType);
37:             logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
38: 
39:             // 有新的通知,刷新本地的缓存
40:             if (response.getStatusCode() == 200 && response.getBody() != null) {
41:                 // 更新 m_notifications
42:                 updateNotifications(response.getBody());
43:                 // 更新 m_remoteNotificationMessages
44:                 updateRemoteNotifications(response.getBody());
45:                 // 【TODO 6001】Tracer 日志
46:                 transaction.addData("Result", response.getBody().toString());
47:                 // 通知对应的 RemoteConfigRepository 们
48:                 notify(lastServiceDto, response.getBody());
49:             }
50: 
51:             // 无新的通知,重置连接的 Config Service 的地址,下次请求不同的 Config Service ,实现负载均衡。
52:             // try to load balance
53:             if (response.getStatusCode() == 304 && random.nextBoolean()) { // 随机
54:                 lastServiceDto = null;
55:             }
56:             // 标记成功
57:             m_longPollFailSchedulePolicyInSecond.success();
58:             // 【TODO 6001】Tracer 日志
59:             transaction.addData("StatusCode", response.getStatusCode());
60:             transaction.setStatus(Transaction.SUCCESS);
61:         } catch (Throwable ex) {
62:             // 重置连接的 Config Service 的地址,下次请求不同的 Config Service
63:             lastServiceDto = null;
64:             // 【TODO 6001】Tracer 日志
65:             Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
66:             transaction.setStatus(ex);
67:             // 标记失败,计算下一次延迟执行时间
68:             long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
69:             logger.warn("Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
70:                     sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
71:             // 等待一定时间,下次失败重试
72:             try {
73:                 TimeUnit.SECONDS.sleep(sleepTimeInSecond);
74:             } catch (InterruptedException ie) {
75:                 //ignore
76:             }
77:         } finally {
78:             transaction.complete();
79:         }
80:     }
81: }
  • 第 5 至 80 行:循环执行,直到停止或线程中断。
  • 第 7 至 13 行:调用 RateLimiter#tryAcquire(long timeout, TimeUnit unit) 方法,判断是否被限流。若限流,sleep 5 秒,避免对 Config Service 请求过于频繁。
  • 第 19 至 23 行:若无 lastServiceDto 对象,随机获得 Config Service 的地址。
  • 第 25 行:调用 #assembleLongPollRefreshUrl(...) 方法,组装长轮询通知变更的地址。
  • 第 29 至 30 行:创建 HttpRequest 对象,并设置超时时间。默认超时时间为 90 秒,大于 Config Service 的通知接口的 60 秒。
  • 第 36 行:调用 HttpUtil#doGet(request, Class) 方法,发起请求,返回 HttpResponse 对象。
  • 第 40 至 49 行:若返回状态码为 200 ,说明有新的通知,刷新本地的缓存。
    • 第 42 行:调用 #updateNotifications(List<ApolloConfigNotification>) 方法,更新 m_notifications 。详细解析,在 「3.7 updateNotifications」
    • 第 44 行:调用 #updateRemoteNotifications(List<ApolloConfigNotification>) 方法,更新 m_remoteNotificationMessages 。详细解析,在 「3.8 updateRemoteNotifications」
    • 第 48 行:调用 #notify(ServiceDTO, List<ApolloConfigNotification>) 方法,通知对应的 RemoteConfigRepository 们。详细解析,在 「3.9 notify」
  • 第 51 至 55 行:若返回状态码为 304 ,说明无新的通知,随机,重置连接的 Config Service 的地址,下次请求不同的 Config Service ,实现负载均衡。
  • 第 57 行:调用 SchedulePolicy#success() 方法,标记成功。
  • 第 61 至 76 行:处理异常。
    • 第 63 行:重置连接的 Config Service 的地址 lastServiceDto ,下次请求不同的 Config Service。
    • 第 67 至 70 行:调用 SchedulePolicy#fail() 方法,标记失败,计算下一次延迟执行时间。
    • 第 71 至 76 行:sleep,等待一定时间,下次失败重试。

3.7 updateNotifications

#updateNotifications(List<ApolloConfigNotification>) 方法,更新 m_notifications 。代码如下:

private void updateNotifications(List<ApolloConfigNotification> deltaNotifications) {
    // 循环 ApolloConfigNotification
    for (ApolloConfigNotification notification : deltaNotifications) {
        if (Strings.isNullOrEmpty(notification.getNamespaceName())) {
            continue;
        }
        // 更新 m_notifications
        String namespaceName = notification.getNamespaceName();
        if (m_notifications.containsKey(namespaceName)) {
            m_notifications.put(namespaceName, notification.getNotificationId());
        }
        // 因为 .properties 在默认情况下被过滤掉,所以我们需要检查是否有 .properties 后缀的通知。如有,更新 m_notifications
        // since .properties are filtered out by default, so we need to check if there is notification with .properties suffix
        String namespaceNameWithPropertiesSuffix = String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue());
        if (m_notifications.containsKey(namespaceNameWithPropertiesSuffix)) {
            m_notifications.put(namespaceNameWithPropertiesSuffix, notification.getNotificationId());
        }
    }
}

3.8 updateRemoteNotifications

#updateRemoteNotifications(List<ApolloConfigNotification>) 方法,更新 m_remoteNotificationMessages 。代码如下:

private void updateRemoteNotifications(List<ApolloConfigNotification> deltaNotifications) {
    // 循环 ApolloConfigNotification
    for (ApolloConfigNotification notification : deltaNotifications) {
        if (Strings.isNullOrEmpty(notification.getNamespaceName())) {
            continue;
        }
        if (notification.getMessages() == null || notification.getMessages().isEmpty()) {
            continue;
        }
        // 若不存在 Namespace 对应的 ApolloNotificationMessages ,进行创建
        ApolloNotificationMessages localRemoteMessages = m_remoteNotificationMessages.get(notification.getNamespaceName());
        if (localRemoteMessages == null) {
            localRemoteMessages = new ApolloNotificationMessages();
            m_remoteNotificationMessages.put(notification.getNamespaceName(), localRemoteMessages);
        }
        // 合并通知消息到 ApolloNotificationMessages 中
        localRemoteMessages.mergeFrom(notification.getMessages());
    }
}

3.9 notify

#notify(ServiceDTO, List<ApolloConfigNotification>) 方法,更新 m_remoteNotificationMessages 。代码如下:

private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
    if (notifications == null || notifications.isEmpty()) {
        return;
    }
    // 循环 ApolloConfigNotification
    for (ApolloConfigNotification notification : notifications) {
        String namespaceName = notification.getNamespaceName(); // Namespace 的名字
        // 创建 RemoteConfigRepository 数组,避免并发问题
        // create a new list to avoid ConcurrentModificationException
        List<RemoteConfigRepository> toBeNotified = Lists.newArrayList(m_longPollNamespaces.get(namespaceName));
        // 因为 .properties 在默认情况下被过滤掉,所以我们需要检查是否有监听器。若有,添加到 RemoteConfigRepository 数组
        // since .properties are filtered out by default, so we need to check if there is any listener for it
        toBeNotified.addAll(m_longPollNamespaces.get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
        // 获得远程的 ApolloNotificationMessages 对象,并克隆
        ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);
        ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();
        // 循环 RemoteConfigRepository ,进行通知
        for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
            try {
                // 进行通知
                remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);
            } catch (Throwable ex) {
                // 【TODO 6001】Tracer 日志
                Tracer.logError(ex);
            }
        }
    }
}

4. SchedulePolicy

com.ctrip.framework.apollo.core.schedule.SchedulePolicy ,定时策略接口。在 Apollo 中,用于执行失败,计算下一次执行的延迟时间。代码如下:

public interface SchedulePolicy {

    /**
     * 执行失败
     *
     * @return 下次执行延迟
     */
    long fail();

    /**
     * 执行成功
     */
    void success();

}

4.1 ExponentialSchedulePolicy

com.ctrip.framework.apollo.core.schedule.ExponentialSchedulePolicy ,实现 SchedulePolicy 接口,基于指数级计算的定时策略实现类。代码如下:

public class ExponentialSchedulePolicy implements SchedulePolicy {

    /**
     * 延迟时间下限
     */
    private final long delayTimeLowerBound;
    /**
     * 延迟时间上限
     */
    private final long delayTimeUpperBound;
    /**
     * 最后延迟执行时间
     */
    private long lastDelayTime;

    public ExponentialSchedulePolicy(long delayTimeLowerBound, long delayTimeUpperBound) {
        this.delayTimeLowerBound = delayTimeLowerBound;
        this.delayTimeUpperBound = delayTimeUpperBound;
    }

    @Override
    public long fail() {
        long delayTime = lastDelayTime;
        // 设置初始时间
        if (delayTime == 0) {
            delayTime = delayTimeLowerBound;
        // 指数级计算,直到上限
        } else {
            delayTime = Math.min(lastDelayTime << 1, delayTimeUpperBound);
        }
        // 最后延迟执行时间
        lastDelayTime = delayTime;
        // 返回
        return delayTime;
    }

    @Override
    public void success() {
        lastDelayTime = 0;
    }

    public static void main(String[] args) {
        ExponentialSchedulePolicy policy = new ExponentialSchedulePolicy(1, 120);
        for (int i = 0; i < 10; i++) {
            System.out.println(policy.fail());
        }
    }

}
  • 每次执行失败,调用 #fail() 方法,指数级计算新的延迟执行时间。
  • 举例如下:

    delayTimeLowerBound, delayTimeUpperBound= [1, 120] 执行 10 轮
    1 2 4 8 16 32 64 120 120 120
    delayTimeLowerBound, delayTimeUpperBound= [30, 120] 执行 10 轮
    30 60 120 120 120 120 120 120 120 120 120 120
    

666. 彩蛋

😈 整个配置加载的流程完成!!!我的天!!!

充实的周天。

知识星球