注册中心 Eureka 源码解析 —— 网络通信

313 阅读19分钟

摘要: 原创出处 www.iocoder.cn/Eureka/tran… 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Eureka 的网络通信部分。在不考虑 Eureka 2.x 的兼容的情况下,Eureka 1.x 主要两部分的网络通信:

  • Eureka-Client 请求 Eureka-Server 的网络通信
  • Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信

本文涉及类在 com.netflix.discovery.shared.transport 包下,涉及到主体类的类图如下( 打开大图 ):

  • 粉色部分 —— EurekaJerseyClient ,对基于 Jersey Server 的 Eureka-Server 的 Jersey 客户端封装。
  • 绿色部分 —— EurekaHttpClient ,Eureka-Server HTTP 访问客户端,定义了具体的 Eureka-Server API 调用方法。如果把 DiscoveryClient 类比成 Service ,那么 EurekaHttpClient 可以类比城 Dao 。
  • 综色部分 —— EurekaHttpClient 实现类,真正实现了具体的 Eureka-Server API 调用方法。
  • 红色部分 —— EurekaHttpClient 委托类,提供了会话、重试、重定向、监控指标收集等特性。
  • 黄色部分 —— EurekaHttpClientFactory,用于创建 EurekaHttpClient 。

类图看起来很复杂,整体调用关系如下( 打开大图 ):

OK ,我们逐层解析,嗨起来。

推荐 Spring Cloud 书籍

推荐 Spring Cloud 视频

2. EurekaHttpClient

com.netflix.discovery.shared.transport.jersey.EurekaJerseyClient ,EurekaHttpClient 接口。接口代码如下:

public interface EurekaJerseyClient {
    ApacheHttpClient4 getClient();
    
    void destroyResources();
}
  • com.sun.jersey.client.apache4.ApacheHttpClient4 ,基于 Apache HttpClient4 实现的 Jersey Client 。

2.1 EurekaJerseyClientImpl

com.netflix.discovery.shared.transport.jersey.EurekaJerseyClientImpl ,EurekaHttpClient 实现类。实现代码如下:

public class EurekaJerseyClientImpl implements EurekaJerseyClient {
    /**
     * 基于 Apache HttpClient4 实现的 Jersey Client
     */
    private final ApacheHttpClient4 apacheHttpClient;
    /**
     * Apache HttpClient 空闲连接清理器
     */
    private final ApacheHttpClientConnectionCleaner apacheHttpClientConnectionCleaner;
    /**
     * Jersey Client 配置
     */
    ClientConfig jerseyClientConfig;
    public EurekaJerseyClientImpl(int connectionTimeout, int readTimeout, final int connectionIdleTimeout,
                                  ClientConfig clientConfig) {
        try {
            jerseyClientConfig = clientConfig;
            // 创建  ApacheHttpClient
            apacheHttpClient = ApacheHttpClient4.create(jerseyClientConfig);
            // 设置 连接参数
            HttpParams params = apacheHttpClient.getClientHandler().getHttpClient().getParams();
            HttpConnectionParams.setConnectionTimeout(params, connectionTimeout);
            HttpConnectionParams.setSoTimeout(params, readTimeout);
            // 创建 ApacheHttpClientConnectionCleaner
            this.apacheHttpClientConnectionCleaner = new ApacheHttpClientConnectionCleaner(apacheHttpClient, connectionIdleTimeout);
        } catch (Throwable e) {
            throw new RuntimeException("Cannot create Jersey client", e);
        }
    }
    @Override
    public ApacheHttpClient4 getClient() {
        return apacheHttpClient;
    }
    @Override
    public void destroyResources() {
        apacheHttpClientConnectionCleaner.shutdown();
        apacheHttpClient.destroy();
    }
}
  • com.netflix.discovery.shared.transport.jersey.ApacheHttpClientConnectionCleaner ,Apache HttpClient 空闲连接清理器,负责周期性关闭处于 half-close 状态的空闲连接。点击 链接 查看带中文注释的 ApacheHttpClientConnectionCleaner。推荐阅读:《HttpClient容易忽视的细节——连接关闭》

2.2 EurekaJerseyClientBuilder

EurekaJerseyClientBuilder ,EurekaJerseyClientImpl 内部类,用于创建 EurekaJerseyClientImpl 。

调用 #build() 方法,创建 EurekaJerseyClientImpl ,实现代码如下:

// EurekaJerseyClientBuilder.java
public EurekaJerseyClient build() {
    MyDefaultApacheHttpClient4Config config = new MyDefaultApacheHttpClient4Config();
    try {
        return new EurekaJerseyClientImpl(connectionTimeout, readTimeout, connectionIdleTimeout, config);
    } catch (Throwable e) {
        throw new RuntimeException("Cannot create Jersey client ", e);
    }
}
  • MyDefaultApacheHttpClient4Config ,继承自 com.sun.jersey.client.apache4.config.DefaultApacheHttpClient4Config ,实现自定义配置。点击 链接 查看带中文注释的 MyDefaultApacheHttpClient4Config。例如 :
    • 自定义的请求、响应的编解码器 com.netflix.discovery.provider.DiscoveryJerseyProvider
    • 禁用重定向,使用 RedirectingEurekaHttpClient 实现该特性。
    • 自定义 UserAgent 。
    • 自定义 Http Proxy 。
    • SSL 功能的增强。ApacheHttpClient4 使用的是 Apache HttpClient 4.1.1 版本,com.netflix.discovery.shared.transport.jersey.SSLSocketFactoryAdapter 将 Apache HttpClient 4.3.4 对 SSL 功能的增强适配到老版本 API 。点击 链接 查看带中文注释的 SSLSocketFactoryAdapter。

3. EurekaHttpClient

com.netflix.discovery.shared.transport.EurekaHttpClient ,Eureka-Server HTTP 访问客户端,定义了具体的 Eureka-Server API 调用方法 。点击 链接 查看带中文注释的 EurekaHttpClient。

3.1 EurekaHttpResponse

com.netflix.discovery.shared.transport.EurekaHttpResponse ,请求响应对象,实现代码如下:

public class EurekaHttpResponse<T> {
    /**
     * 返回状态码
     */
    private final int statusCode;
    /**
     * 返回对象( Entity )
     */
    private final T entity;
    /**
     * 返回 header
     */
    private final Map<String, String> headers;
    /**
     * 重定向地址
     */
    private final URI location;
    
    // ... 省略 setting / getting 和 Builder
}

3.2 TransportClientFactory

com.netflix.discovery.shared.transport.TransportClientFactory ,创建 EurekaHttpClient 的工厂接口。接口代码如下:

public interface TransportClientFactory {
    /**
     * 创建 EurekaHttpClient
     *
     * @param serviceUrl Eureka-Server 地址
     * @return EurekaHttpClient
     */
    EurekaHttpClient newClient(EurekaEndpoint serviceUrl);
    /**
     * 关闭工厂
     */
    void shutdown();
}

大多数 EurekaHttpClient 实现类都有其对应的工厂实现类

4. AbstractJerseyEurekaHttpClient

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient ,实现 EurekaHttpClient 的抽象类真正实现了具体的 Eureka-Server API 调用方法。实现代码如下:

 1: public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
 2: 
 3: private static final Logger logger = LoggerFactory.getLogger(AbstractJerseyEurekaHttpClient.class);
 4: 
 5: /**
 6:  * Jersey Client
 7:  */
 8: protected final Client jerseyClient;
 9: /**
10:  * 请求的 Eureka-Server 地址
11:  */
12: protected final String serviceUrl;
13: 
14: protected AbstractJerseyEurekaHttpClient(Client jerseyClient, String serviceUrl) {
15:     this.jerseyClient = jerseyClient;
16:     this.serviceUrl = serviceUrl;
17:     logger.debug("Created client for url: {}", serviceUrl);
18: }
19: 
20: @Override
21: public EurekaHttpResponse<Void> register(InstanceInfo info) {
22:     // 设置 请求地址
23:     String urlPath = "apps/" + info.getAppName();
24:     ClientResponse response = null;
25:     try {
26:         Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
27:         // 设置 请求头
28:         addExtraHeaders(resourceBuilder);
29:         // 请求 Eureka-Server
30:         response = resourceBuilder
31:                 .header("Accept-Encoding", "gzip") // GZIP
32:                 .type(MediaType.APPLICATION_JSON_TYPE) // 请求参数格式 JSON
33:                 .accept(MediaType.APPLICATION_JSON) // 响应结果格式 JSON
34:                 .post(ClientResponse.class, info); // 请求参数
35:         // 创建 EurekaHttpResponse
36:         return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
37:     } finally {
38:         if (logger.isDebugEnabled()) {
39:             logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
40:                     response == null ? "N/A" : response.getStatus());
41:         }
42:         if (response != null) {
43:             response.close();
44:         }
45:     }
46: }
  • jerseyClient 属性,Jersey Client ,使用上文的 EurekaHttpClient#getClient(...) 方法,获取 ApacheHttpClient4 。
  • serviceUrl 属性,请求的 Eureka-Server 地址。
  • #register() 方法,实现向 Eureka-Server 注册应用实例。其他方法代码类似

    • 第 22 至 26 行 :设置请求地址。
    • 第 28 行 :调用 #addExtraHeaders(...) 方法,设置请求头( header )。该方法是抽象方法,提供子类实现自定义的请求头。代码如下:

      protected abstract void addExtraHeaders(Builder webResource);
      • x
        • 第 29 至 34 行 :请求 Eureka-Server 。
        • 第 35 至 36 行 :解析响应结果,创建 EurekaHttpResponse 。

4.1 JerseyApplicationClient

com.netflix.discovery.shared.transport.jersey.JerseyApplicationClient ,实现 Eureka-Client 请求 Eureka-Server 的网络通信。点击 链接 查看带中文注释的 JerseyApplicationClient。

4.1.1 JerseyEurekaHttpClientFactory

com.netflix.discovery.shared.transport.jersey.JerseyEurekaHttpClientFactory ,创建 JerseyApplicationClient 的工厂类。实现代码如下:

public class JerseyEurekaHttpClientFactory implements TransportClientFactory {
    
    private final EurekaJerseyClient jerseyClient;
    private final ApacheHttpClient4 apacheClient;
    private final ApacheHttpClientConnectionCleaner cleaner;
    private final Map<String, String> additionalHeaders;
    
    public JerseyEurekaHttpClientFactory(ApacheHttpClient4 apacheClient, long connectionIdleTimeout, Map<String, String> additionalHeaders) {
        this(null, apacheClient, connectionIdleTimeout, additionalHeaders);
    }
    private JerseyEurekaHttpClientFactory(EurekaJerseyClient jerseyClient,
                                          ApacheHttpClient4 apacheClient,
                                          long connectionIdleTimeout,
                                          Map<String, String> additionalHeaders) {
        this.jerseyClient = jerseyClient;
        this.apacheClient = jerseyClient != null ? jerseyClient.getClient() : apacheClient;
        this.additionalHeaders = additionalHeaders;
        this.cleaner = new ApacheHttpClientConnectionCleaner(this.apacheClient, connectionIdleTimeout);
    }
    
    @Override
    public EurekaHttpClient newClient(EurekaEndpoint endpoint) {
        return new JerseyApplicationClient(apacheClient, endpoint.getServiceUrl(), additionalHeaders);
    }
    @Override
    public void shutdown() {
        cleaner.shutdown();
        if (jerseyClient != null) {
            jerseyClient.destroyResources();
        } else {
            apacheClient.destroy();
        }
    }
}

4.1.2 JerseyEurekaHttpClientFactoryBuilder

JerseyEurekaHttpClientFactoryBuilder ,JerseyEurekaHttpClientFactory 内部类,用于创建 JerseyEurekaHttpClientFactory 。点击 链接 查看带中文注释的 JerseyEurekaHttpClientFactory。

调用 JerseyEurekaHttpClientFactory#create(...) 方法,创建 JerseyEurekaHttpClientFactory ,实现代码如下:

public static JerseyEurekaHttpClientFactory create(EurekaClientConfig clientConfig,
                                                  Collection<ClientFilter> additionalFilters,
                                                  InstanceInfo myInstanceInfo,
                                                  AbstractEurekaIdentity clientIdentity) {
   JerseyEurekaHttpClientFactoryBuilder clientBuilder = newBuilder()
           .withAdditionalFilters(additionalFilters) // 客户端附加过滤器
           .withMyInstanceInfo(myInstanceInfo) // 应用实例
           .withUserAgent("Java-EurekaClient") // UA
           .withClientConfig(clientConfig)
           .withClientIdentity(clientIdentity);
   // 设置 Client Name
   if ("true".equals(System.getProperty("com.netflix.eureka.shouldSSLConnectionsUseSystemSocketFactory"))) {
       clientBuilder.withClientName("DiscoveryClient-HTTPClient-System").withSystemSSLConfiguration();
   } else if (clientConfig.getProxyHost() != null && clientConfig.getProxyPort() != null) {
       clientBuilder.withClientName("Proxy-DiscoveryClient-HTTPClient")
               .withProxy(
                       clientConfig.getProxyHost(), Integer.parseInt(clientConfig.getProxyPort()),
                       clientConfig.getProxyUserName(), clientConfig.getProxyPassword()
               ); // http proxy
   } else {
       clientBuilder.withClientName("DiscoveryClient-HTTPClient");
   }
   return clientBuilder.build();
}
public static JerseyEurekaHttpClientFactoryBuilder newBuilder() {
   return new JerseyEurekaHttpClientFactoryBuilder().withExperimental(false);
}

4.2 JerseyReplicationClient

com.netflix.eureka.transport.JerseyReplicationClient ,Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信。

4.2.1 没有工厂

JerseyReplicationClient 没有专属的工厂

调用 JerseyReplicationClient#createReplicationClient(...) 静态方法,创建 JerseyReplicationClient 。点击 链接 查看带中文注释的方法代码。

5. EurekaHttpClientDecorator

com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator,EurekaHttpClient 委托者抽象类。实现代码如下:

public abstract class EurekaHttpClientDecorator implements EurekaHttpClient {
    /**
     * 执行请求
     *
     * @param requestExecutor 请求执行器
     * @param <R> 请求泛型
     * @return 响应
     */
    protected abstract <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor);
    
    @Override
    public EurekaHttpResponse<Void> register(final InstanceInfo info) {
        return execute(new RequestExecutor<Void>() {
            @Override
            public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
                return delegate.register(info);
            }
            @Override
            public RequestType getRequestType() {
                return RequestType.Register;
            }
        });
    }
}
  • #execute(...) 抽象方法,子类实现该方法,实现自己的特性。
  • #register() 方法,实现向 Eureka-Server 注册应用实例。其他方法代码类似
  • RequestType ,请求类型枚举类。代码如下:

    // EurekaHttpClientDecorator.java
    public enum RequestType {
       Register,
       Cancel,
       SendHeartBeat,
       StatusUpdate,
       DeleteStatusOverride,
       GetApplications,
       GetDelta,
       GetVip,
       GetSecureVip,
       GetApplication,
       GetInstance,
       GetApplicationInstance
    }
  • RequestExecutor ,请求执行器接口。接口代码如下:

    // EurekaHttpClientDecorator.java
    public interface RequestExecutor<R> {
       /**
        * 执行请求
        *
        * @param delegate 委托的 EurekaHttpClient
        * @return 响应
        */
       EurekaHttpResponse<R> execute(EurekaHttpClient delegate);
       /**
        * @return 请求类型
        */
       RequestType getRequestType();
    }

EurekaHttpClientDecorator 的每个实现类实现一个特性,代码非常非常非常清晰。

FROM 《委托模式》
委托模式是软件设计模式中的一项基本技巧。在委托模式中,有两个对象参与处理同一个请求,接受请求的对象将请求委托给另一个对象来处理。委托模式是一项基本技巧,许多其他的模式,如状态模式、策略模式、访问者模式本质上是在更特殊的场合采用了委托模式。委托模式使得我们可以用聚合来替代继承,它还使我们可以模拟mixin。

我们在上图的基础上,增加委托的关系,如下图( 打开大图 ):

  • 请注意,每个委托着实现类,上面可能有类型为 EurekaHttpClientFactory 的属性,用于创建其委托的 EurekaHttpClient 。为什么会有 Factory ?例如,RetryableEurekaHttpClient 重试请求多个 Eureka-Server 地址时,每个 Eureka-Server 地址会创建一个 EurekaHttpClient 。所以,下文涉及到 EurekaHttpClientFactory 和委托的 EurekaHttpClient 的地方,你都需要仔细理解。

5.1 MetricsCollectingEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient ,监控指标收集 EurekaHttpClient ,配合 Netflix Servo 实现监控信息采集。

#execute() 方法,代码如下:

 1: @Override
 2: protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
 3:     // 获得 请求类型 的 请求指标
 4:     EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
 5:     Stopwatch stopwatch = requestMetrics.latencyTimer.start();
 6:     try {
 7:         // 执行请求
 8:         EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
 9:         // 增加 请求指标
10:         requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment();
11:         return httpResponse;
12:     } catch (Exception e) {
13:         requestMetrics.connectionErrors.increment();
14:         exceptionsMetric.count(e);
15:         throw e;
16:     } finally {
17:         stopwatch.stop();
18:     }
19: }
  • 第 10 行 :调用 RequestExecutor#execute(...) 方法,继续执行请求。
    • delegate 属性,对应 JerseyApplicationClient 。

5.2 RedirectingEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient寻找非 302 重定向的 Eureka-Server 的 EurekaHttpClient 。

#execute() 方法,代码如下:

 1: @Override
 2: protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
 3:     EurekaHttpClient currentEurekaClient = delegateRef.get();
 4:     if (currentEurekaClient == null) { // 未找到非 302 的 Eureka-Server
 5:         AtomicReference<EurekaHttpClient> currentEurekaClientRef = new AtomicReference<>(factory.newClient(serviceEndpoint));
 6:         try {
 7:             EurekaHttpResponse<R> response = executeOnNewServer(requestExecutor, currentEurekaClientRef);
 8:             // 关闭原有的委托 EurekaHttpClient ,并设置当前成功非 302 请求的 EurekaHttpClient
 9:             TransportUtils.shutdown(delegateRef.getAndSet(currentEurekaClientRef.get()));
10:             return response;
11:         } catch (Exception e) {
12:             logger.error("Request execution error", e);
13:             TransportUtils.shutdown(currentEurekaClientRef.get());
14:             throw e;
15:         }
16:     } else { // 已经找到非 302 的 Eureka-Server
17:         try {
18:             return requestExecutor.execute(currentEurekaClient);
19:         } catch (Exception e) {
20:             logger.error("Request execution error", e);
21:             delegateRef.compareAndSet(currentEurekaClient, null);
22:             currentEurekaClient.shutdown();
23:             throw e;
24:         }
25:     }
26: }
  • 注意:和我们理解的常规的 302 状态返回处理不同!!!
  • 整个分成两部分:【第 4 至 15 行】、【第 16 至 24 行】。
    • 前者,意味着未找到非返回 302 状态码的 Eureka-Server ,此时通过在原始传递进来的 serviceUrls 执行请求,寻找非 302 状态码返回的 Eureka-Server。
      • 当返回非 302 状态码时,找到非返回 302 状态码的 Eureka-Server 。
      • 当返回 302 状态码时,向新的重定向的 Eureka-Server 执行请求直到成功找到或超过最大次数。
        • 后者,意味着当前已经找到非返回 302 状态码的 Eureka-Server ,直接执行请求。注意 :此时 Eureka-Server 再返回 302 状态码,不再处理。
        • 目前 Eureka 1.x 的 Eureka-Server 不存在返回 302 状态码,猜测和 Eureka 2.X TODO[0028]:写入集群和读取集群 有关。
  • 【前者】第 5 行 :使用初始的 serviceEndpoint ( 相当于 serviceUrls ) 创建委托 EurekaHttpClient 。
  • 【前者】第 7 行 :调用 #executeOnNewServer(...) 方法,通过执行请求的方式,寻找非 302 状态码返回的 Eureka-Server。实现代码,点击 链接 查看带中文注释的代码实现。
  • 【前者】【前者】第 9 行 :关闭原有的 delegateRef ( 因为此处可能存在并发,多个线程都找到非 302 状态码返回的 Eureka-Server ),并设置当前成功非 302 请求的 EurekaHttpClient 到 delegateRef
  • 【前者】第 13 行 :关闭 currentEurekaClientRef ,当请求发生异常或者超过最大重定向次数。
  • 【后者】第 18 行 :意味着当前已经找到非返回 302 状态码的 Eureka-Server ,直接执行请求。
  • 【后者】第 21 至 22 行 :执行请求发生异常,关闭 currentEurekaClient ,后面要重新非返回 302 状态码的 Eureka-Server 。

5.2.1 工厂

RedirectingEurekaHttpClient 提供 #createFactory(...) 静态方法获得创建其的工厂,点击 链接 查看。

5.3 RetryableEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient ,支持向多个 Eureka-Server 请求重试的 EurekaHttpClient 。

#execute() 方法,代码如下:

 1: @Override
 2: protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
 3:     List<EurekaEndpoint> candidateHosts = null;
 4:     int endpointIdx = 0;
 5:     for (int retry = 0; retry < numberOfRetries; retry++) {
 6:         EurekaHttpClient currentHttpClient = delegate.get();
 7:         EurekaEndpoint currentEndpoint = null;
 8: 
 9:         // 当前委托的 EurekaHttpClient 不存在
10:         if (currentHttpClient == null) {
11:             // 获得候选的 Eureka-Server 地址数组
12:             if (candidateHosts == null) {
13:                 candidateHosts = getHostCandidates();
14:                 if (candidateHosts.isEmpty()) {
15:                     throw new TransportException("There is no known eureka server; cluster server list is empty");
16:                 }
17:             }
18: 
19:             // 超过候选的 Eureka-Server 地址数组上限
20:             if (endpointIdx >= candidateHosts.size()) {
21:                 throw new TransportException("Cannot execute request on any known server");
22:             }
23: 
24:             // 创建候选的 EurekaHttpClient
25:             currentEndpoint = candidateHosts.get(endpointIdx++);
26:             currentHttpClient = clientFactory.newClient(currentEndpoint);
27:         }
28: 
29:         try {
30:             // 执行请求
31:             EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
32:             // 判断是否为可接受的相应,若是,返回。
33:             if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
34:                 delegate.set(currentHttpClient);
35:                 if (retry > 0) {
36:                     logger.info("Request execution succeeded on retry #{}", retry);
37:                 }
38:                 return response;
39:             }
40:             logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
41:         } catch (Exception e) {
42:             logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace
43:         }
44: 
45:         // 请求失败,若是 currentHttpClient ,清除 delegate
46:         // Connection error or 5xx from the server that must be retried on another server
47:         delegate.compareAndSet(currentHttpClient, null);
48: 
49:         // 请求失败,将 currentEndpoint 添加到隔离集合
50:         if (currentEndpoint != null) {
51:             quarantineSet.add(currentEndpoint);
52:         }
53:     }
54:     throw new TransportException("Retry limit reached; giving up on completing the request");
55: }
  • 第 10 行 :当前 currentHttpClient 不存在,意味着原有 delegate 不存在向 Eureka-Server 成功请求的 EurekaHttpClient 。
    • 此时需要从配置中的 Eureka-Server 数组重试请求,获得可以请求的 Eureka-Server 。
    • 如果已经存在请求成功的 delegate ,直接使用它进行执行请求。
  • 第 11 至 17 行 :调用 #getHostCandidates() 方法,获得候选的 Eureka-Server serviceUrls 数组。实现代码如下:

     1: private List<EurekaEndpoint> getHostCandidates() {
     2:     // 获得候选的 Eureka-Server 地址数组
     3:     List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
     4: 
     5:     // 保留交集(移除 quarantineSet 不在 candidateHosts 的元素)
     6:     quarantineSet.retainAll(candidateHosts);
     7: 
     8:     // 在保证最小可用的候选的 Eureka-Server 地址数组,移除在隔离集合内的元素
     9:     // If enough hosts are bad, we have no choice but start over again
    10:     int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage()); // 0.66
    11:     if (quarantineSet.isEmpty()) {
    12:         // no-op
    13:     } else if (quarantineSet.size() >= threshold) {
    14:         logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
    15:         quarantineSet.clear();
    16:     } else {
    17:         List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
    18:         for (EurekaEndpoint endpoint : candidateHosts) {
    19:             if (!quarantineSet.contains(endpoint)) {
    20:                 remainingHosts.add(endpoint);
    21:             }
    22:         }
    23:         candidateHosts = remainingHosts;
    24:     }
    25: 
    26:     return candidateHosts;
    27: }
    • 第 3 行 :调用 ClusterResolver#getClusterEndpoints() 方法,获得候选的 Eureka-Server 地址数组( candidateHosts )。注意:该方法返回的 Eureka-Server 地址数组,使用以本机 IP 为随机种子,达到不同 IP 的应用实例获得的数组顺序不同,而相同 IP 的应用实例获得的数组顺序一致,效果类似基于 IP HASH 的负载均衡算法。实现该功能的代码,在 《Eureka 源码解析 —— EndPoint 与 解析器》搜索关键字【ResolverUtils#randomize(…)】 详细解析。
    • 第 6 行 :调用 Set#retainAll() 方法,移除隔离的故障 Eureka-Server 地址数组( quarantineSet ) 中不在 candidateHosts 的元素。
    • 第 8 至 24 行 :在保证最小可用的 candidateHosts,移除在 quarantineSet 的元素。
      • 第 10 行 :最小可用的阀值,配置 eureka.retryableClientQuarantineRefreshPercentage 来设置百分比,默认值:0.66
      • 最 13 至 15 行 :quarantineSet 数量超过阀值,清空 quarantineSet ,全部 candidateHosts 重试。
      • 第 17 至 24 行 :quarantineSet 数量未超过阀值,移除 candidateHosts 中在 quarantineSet 的元素。
  • 第 19 至 22 行 :超过 candidateHosts 上限,全部 Eureka-Server 请求失败,抛出异常。

  • 第 24 至 26 行 :创建委托的 EurekaHttpClient ,用于下面请求执行。
  • 第 31 行 :执行请求。
  • 第 33 行 :调用 ServerStatusEvaluator#accept() 方法,判断响应状态码和请求类型是否能够接受。实现代码如下:

    // ServerStatusEvaluators.java
    private static final ServerStatusEvaluator LEGACY_EVALUATOR = new ServerStatusEvaluator() {
       @Override
       public boolean accept(int statusCode, RequestType requestType) {
           if (statusCode >= 200 && statusCode < 300 || statusCode == 302) {
               return true;
           } else if (requestType == RequestType.Register && statusCode == 404) { // 注册,404 可接受
               return true;
           } else if (requestType == RequestType.SendHeartBeat && statusCode == 404) { // 心跳,404 可接受
               return true;
           } else if (requestType == RequestType.Cancel) {  // cancel is best effort 下线,接受全部
               return true;
           } else if (requestType == RequestType.GetDelta && (statusCode == 403 || statusCode == 404)) { // 增量获取注册信息,403 404 可接受
               return true;
           }
           return false;
       }
    };
  • 第 34 行 :请求成功,设置 delegate 。下次请求,优先使用 delegate ,失败才进行候选的 Eureka-Server 地址数组重试。

  • 第 47 行 :请求失败,delegate 若等于 currentHttpClient ,进行清除。
  • 第 50 至 52 行 :请求失败,将请求的 Eureka-Server 地址添加到 quarantineSet
  • 总结来说:
    • 【第一步】若当前有请求成功的 EurekaHttpClient ,继续使用。若请求失败,执行【第二步】。
    • 【第二步】若当前无请求成功的 EurekaHttpClient ,获取候选的 Eureka-Server 地址数组顺序创建新的 EurekaHttpClient,直到成功,或者超过最大重试次数。当请求成功,保存该 EurekaHttpClient ,下次继续使用,直到请求失败。

5.3.1 工厂

RetryableEurekaHttpClient 提供 #createFactory(...) 静态方法获得创建其的工厂,点击 链接 查看。

5.4 SessionedEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient ,支持会话的 EurekaHttpClient 。执行定期的重建会话,防止一个 Eureka-Client 永远只连接一个特定的 Eureka-Server 。反过来,这也保证了 Eureka-Server 集群变更时,Eureka-Client 对 Eureka-Server 连接的负载均衡。

#execute(...) ,代码如下:

 1: @Override
 2: protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
 3:     long now = System.currentTimeMillis();
 4:     long delay = now - lastReconnectTimeStamp;
 5: 
 6:     // 超过 当前会话时间,关闭当前委托的 EurekaHttpClient 。
 7:     if (delay >= currentSessionDurationMs) {
 8:         logger.debug("Ending a session and starting anew");
 9:         lastReconnectTimeStamp = now;
10:         currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
11:         TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
12:     }
13: 
14:     // 获得委托的 EurekaHttpClient 。若不存在,则创建新的委托的 EurekaHttpClient 。
15:     EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
16:     if (eurekaHttpClient == null) {
17:         eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
18:     }
19:     return requestExecutor.execute(eurekaHttpClient);
20: }
  • 第 7 至 12 行 :超过当前会话时间,关闭当前委托的 EurekaHttpClient 。

    • 第 10 行 :调用 #randomizeSessionDuration(...) 方法,计算计算下一次会话超时时长,公式为 sessionDurationMs * (0.5, 1.5) ,代码如下:

      protected long randomizeSessionDuration(long sessionDurationMs) {
         long delta = (long) (sessionDurationMs * (random.nextDouble() - 0.5));
         return sessionDurationMs + delta;
      }
      • 增加会话过期的随机性,实现所有 Eureka-Client 的会话过期重连的发生时间更加离散,避免集中时间过期。目前猜测这么做的目的和 TODO[0028]:写入集群和读取集群 有关,即返回 302 。关联 1.x new transport enhancements
  • 第 15 至 18 行 :获得委托的 EurekaHttpClient 。若不存在,创建新的委托的 EurekaHttpClient 。TransportUtils#getOrSetAnotherClient(...) 方法代码如下:

     1: public static EurekaHttpClient getOrSetAnotherClient(AtomicReference<EurekaHttpClient> eurekaHttpClientRef, EurekaHttpClient another) {
     2:     EurekaHttpClient existing = eurekaHttpClientRef.get();
     3:     // 为空才设置
     4:     if (eurekaHttpClientRef.compareAndSet(null, another)) {
     5:         return another;
     6:     }
     7:     // 设置失败,意味着另外一个线程已经设置
     8:     another.shutdown();
     9:     return existing;
    10: }
    • 该方法实现,获得 eurekaHttpClientRef 里的 EurekaHttpClient 。若获取不到,将 another 设置到 eurekaHttpClientRef 。当有多个线程设置时,有且只有一个线程设置成功,另外的设置失败的线程们,意味着当前 eurekaHttpClientRef 有 EurekaHttpClient ,返回 eurekaHttpClientRef
    • 目前该方法存在 BUG ,失败的线程直接返回 existing 的是 null ,需要修改成 return eurekaHttpClientRef.get() 。模拟重现该 BUG 代码如下 :

  • 第 19 行 :执行请求。

5.4.1 没有工厂

在 SessionedEurekaHttpClient 类里,没有实现创建其的工厂。在 「6. 创建网络通讯客户端」搜索 canonicalClientFactory ,可以看到 EurekaHttpClients#canonicalClientFactory(...) 方法,内部有 SessionedEurekaHttpClient 的创建工厂。

6. 创建网络通讯客户端

对于 Eureka-Server 来说,调用 JerseyReplicationClient#createReplicationClient(...) 静态方法即可创建用于 Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信客户端。

对于 Eureka-Client 来说,分成用于注册应用实例( registrationClient )和查询注册信息( newQueryClient )的两个不同网络通信客户端。在 DiscoveryClient 初始化时进行创建,代码如下:

// DiscoveryClient.class
  1: private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
  2:                                         AbstractDiscoveryClientOptionalArgs args) {
  3:         
  4:     Collection<?> additionalFilters = args == null
  5:             ? Collections.emptyList()
  6:             : args.additionalFilters;
  7: 
  8:     EurekaJerseyClient providedJerseyClient = args == null
  9:             ? null
 10:             : args.eurekaJerseyClient;
 11:     
 12:     TransportClientFactories argsTransportClientFactories = null;
 13:     if (args != null && args.getTransportClientFactories() != null) {
 14:         argsTransportClientFactories = args.getTransportClientFactories();
 15:     }
 16:     
 17:     // Ignore the raw types warnings since the client filter interface changed between jersey 1/2
 18:     @SuppressWarnings("rawtypes")
 19:     TransportClientFactories transportClientFactories = argsTransportClientFactories == null
 20:             ? new Jersey1TransportClientFactories()
 21:             : argsTransportClientFactories;
 22: 
 23:     // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
 24:     // noinspection unchecked
 25:     eurekaTransport.transportClientFactory = providedJerseyClient == null
 26:             ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo())
 27:             : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);
 28: 
 29:     // (省略代码)初始化 应用解析器的应用实例数据源 TODO[0028]写入集群和读取集群
 30: 
 31:     // (省略代码)创建 EndPoint 解析器
 32:     eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(...)
 33: 
 34:     if (clientConfig.shouldRegisterWithEureka()) {
 35:         EurekaHttpClientFactory newRegistrationClientFactory = null;
 36:         EurekaHttpClient newRegistrationClient = null;
 37:         try {
 38:             newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
 39:                     eurekaTransport.bootstrapResolver,
 40:                     eurekaTransport.transportClientFactory,
 41:                     transportConfig
 42:             );
 43:             newRegistrationClient = newRegistrationClientFactory.newClient();
 44:         } catch (Exception e) {
 45:             logger.warn("Transport initialization failure", e);
 46:         }
 47:         eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
 48:         eurekaTransport.registrationClient = newRegistrationClient;
 49:     }
 50: 
 51:     // new method (resolve from primary servers for read)
 52:     // Configure new transport layer (candidate for injecting in the future)
 53:     if (clientConfig.shouldFetchRegistry()) {
 54:         EurekaHttpClientFactory newQueryClientFactory = null;
 55:         EurekaHttpClient newQueryClient = null;
 56:         try {
 57:             newQueryClientFactory = EurekaHttpClients.queryClientFactory(
 58:                     eurekaTransport.bootstrapResolver,
 59:                     eurekaTransport.transportClientFactory,
 60:                     clientConfig,
 61:                     transportConfig,
 62:                     applicationInfoManager.getInfo(),
 63:                     applicationsSource
 64:             );
 65:             newQueryClient = newQueryClientFactory.newClient();
 66:         } catch (Exception e) {
 67:             logger.warn("Transport initialization failure", e);
 68:         }
 69:         eurekaTransport.queryClientFactory = newQueryClientFactory;
 70:         eurekaTransport.queryClient = newQueryClient;
 71:     }
 72: }
  • 第 18 至 27 行 :调用 Jersey1TransportClientFactories#newTransportClientFactory(...) 方法,创建 registrationClientqueryClient 公用的委托的 EurekaHttpClientFactory ,代码如下:

    // Jersey1TransportClientFactories.java
    public TransportClientFactory newTransportClientFactory(final EurekaClientConfig clientConfig,
                                                                  final Collection<ClientFilter> additionalFilters,
                                                                  final InstanceInfo myInstanceInfo) {
       // JerseyEurekaHttpClientFactory
       final TransportClientFactory jerseyFactory = JerseyEurekaHttpClientFactory.create(
               clientConfig,
               additionalFilters,
               myInstanceInfo,
               new EurekaClientIdentity(myInstanceInfo.getIPAddr())
       );
       // TransportClientFactory
       final TransportClientFactory metricsFactory = MetricsCollectingEurekaHttpClient.createFactory(jerseyFactory); // 委托 TransportClientFactory
       return new TransportClientFactory() {
           @Override
           public EurekaHttpClient newClient(EurekaEndpoint serviceUrl) {
               return metricsFactory.newClient(serviceUrl);
           }
           @Override
           public void shutdown() {
               metricsFactory.shutdown();
               jerseyFactory.shutdown();
           }
       };
    }
    • 在 TransportClientFactory 里委托 JerseyEurekaHttpClientFactory 。
  • 第 34 至 49 行 :调用 EurekaHttpClients#registrationClientFactory(...) 方法,创建 registrationClient 的 EurekaHttpClientFactory ,代码如下 :

    // EurekaHttpClients.java
    public static EurekaHttpClientFactory registrationClientFactory(ClusterResolver bootstrapResolver,
                                                                    TransportClientFactory transportClientFactory,
                                                                    EurekaTransportConfig transportConfig) {
        return canonicalClientFactory(EurekaClientNames.REGISTRATION, transportConfig, bootstrapResolver, transportClientFactory);
    }
    static EurekaHttpClientFactory canonicalClientFactory(final String name,
                                                         final EurekaTransportConfig transportConfig,
                                                         final ClusterResolver<EurekaEndpoint> clusterResolver,
                                                         final TransportClientFactory transportClientFactory) {
       return new EurekaHttpClientFactory() { // SessionedEurekaHttpClientFactory
           @Override
           public EurekaHttpClient newClient() {
               return new SessionedEurekaHttpClient(
                       name,
                       RetryableEurekaHttpClient.createFactory( // RetryableEurekaHttpClient
                               name,
                               transportConfig,
                               clusterResolver,
                               RedirectingEurekaHttpClient.createFactory(transportClientFactory), // RedirectingEurekaHttpClient
                               ServerStatusEvaluators.legacyEvaluator()),
                       transportConfig.getSessionedClientReconnectIntervalSeconds() * 1000
               );
           }
           @Override
           public void shutdown() {
               wrapClosable(clusterResolver).shutdown();
           }
       };
    }
  • 第 51 至 71 行 :调用 EurekaHttpClients#queryClientFactory(...) 方法,创建 queryClient 的 EurekaHttpClientFactory ,代码如下 :

    // EurekaHttpClients.java
    public static EurekaHttpClientFactory queryClientFactory(ClusterResolver bootstrapResolver,
                                                            TransportClientFactory transportClientFactory,
                                                            EurekaClientConfig clientConfig,
                                                            EurekaTransportConfig transportConfig,
                                                            InstanceInfo myInstanceInfo,
                                                            ApplicationsResolver.ApplicationsSource applicationsSource) {
       ClosableResolver queryResolver = transportConfig.useBootstrapResolverForQuery()
               ? wrapClosable(bootstrapResolver)
               : queryClientResolver(bootstrapResolver, transportClientFactory,
               clientConfig, transportConfig, myInstanceInfo, applicationsSource);
       return canonicalClientFactory(EurekaClientNames.QUERY, transportConfig, queryResolver, transportClientFactory); // 该方法上面有
    }

666. 彩蛋

知识星球

这次真的是彩蛋,我们将整体调用关系调整如下如下( 打开大图 ):

胖友,你学会了么?

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?