阅读 775

Spring Cloud-Eureka Client 原理解析

前面一些 demo 中已经介绍了如何使用 SOFABoot 来集成 Spring Cloud Netflix Eureka 组件。本篇将来先解析下 Eureka Client 的工作原理。

Netflix 和 SpringCloud

spring-cloud-commons 模块是 spring 在分布式领域上(服务发现,服务注册,断路器,负载均衡)的规范定义。spring-cloud-netflix 是基于此规范的具体实现,Netflix OSS 里的各种组件也都实现了这个 commons 规范。关系如下:

image.png

Spring Cloud Netflix Eureka 服务发现实现原理

基于上图,这里以 Eureka 中的服务发现为例,来具体讲下是如何实现的。Spring Cloud common 中提供了用于服务发现的两个关键类:DiscoveryClient 接口 和 EnableDiscoveryClient 注解。

DiscoveryClient 接口

下面这张图描述的是在服务发现这个功能上,SpringCloud 是如何与 Netflix 整合的。 在 spring-cloud-netflix-eureka-client 中对 Spring Cloud Common 中的 DiscoveryClient 接口进行了实现,实现类是 EurekaDiscoveryClient 。

image.png

DiscoveryClient 的接口定义与方法:

/**
 * DiscoveryClient表示服务发现常用的读取操作,例如Netflix Eureka或consul.io
 * @author Spencer Gibb
 */
public interface DiscoveryClient {

	/**
	 * 实现描述
	 * @return the description
	 */
	String description();

	/**
	 * 获取与特定serviceId关联的所有ServiceInstances
	 * @param serviceId the serviceId to query
	 * @return a List of ServiceInstance
	 */
	List<ServiceInstance> getInstances(String serviceId);

	/**
	 * 返回所有已知的服务ID
	 */
	List<String> getServices();
}
复制代码

EurekaDiscoveryClient 中实现了这几个方法,但是 EurekaDiscoveryClient 自身没有实现如何与服务端交互的逻辑,而是通过 com.netflix.DiscoveryClient 类来完成。所以 spring-cloud-netflix-eureka-client 干的事情就是实现了 Spring Cloud Common 规范,然后在实现上包装了 netflix 。

@EnableDiscoveryClient 注解


```java @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class) public @interface EnableDiscoveryClient { //是否自动注册,默认是true。 boolean autoRegister() default true; } ```

EnableDiscoveryClientImportSelector 将会从 META-INF/spring.factories 里找出 key 为org.springframework.cloud.client.discovery.EnableDiscoveryClient 的类。

对于 autoRegister :

  • 如果自动注册属性为true,会在找出的这些类里再加上一个类:AutoServiceRegistrationConfiguration, AutoServiceRegistrationConfiguration 内部会使用@EnableConfigurationProperties(AutoServiceRegistrationProperties.class) 触发构造AutoServiceRegistrationProperties 这个 bean。像eureka,nacos,它们的自动化配置类里都使用了@ConditionalOnBean(AutoServiceRegistrationProperties.class) 来确保存在AutoServiceRegistrationProperties 这个 bean 存在的时候才会构造 AutoServiceRegistration 进行注册。
  • 如果自动注册属性为 false,在Environment 里加一个 PropertySource,内部的配置项是spring.cloud.service-registry.auto-registration.enabled,值是false(代表不构造AutoServiceRegistrationProperties.class)。这样 eureka 就不会注册。

对应上面这段逻辑的代码如下:

image.png

spring-cloud-netflix-eureka-client 自己也提供了一个注解 EnableEurekaClient,其作用于这个注解一样

Eureka 架构图

image.png

  • consumer  : 服务消费方,eureka client 角色,可以从 eureka server 上拉取到其他已注册服务的信息,从而根据这些信息找到自己所需的服务,然后发起远程调用。
  • provider : 服务提供方,eureka client 角色,可以向 eureka server 上注册和更新自己的信息,当然作为 eureka client ,它也可以从server 上获取到其他服务的信息。
  • Eureka server : 服务注册中心,提供服务注册和服务发现功能;
  • 同步复制 : eureka server 之间进行注册服务信息的同步,这样可以保证集群中每个server 都能提供完整的服务信息。

关于 AWS 上 Regin 和 Availability Zone 的概念,请自行查阅相关资料

源码解析

配置信息读取

Eureka Client的自动配置类是 org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration ,这里面主要就负责了一些配置信息的服务诸如 DiscoveryClient 、EurekaServiceRegistry等主要bean的初始化工作。

另外还有一个 EurekaDiscoveryClientConfiguration 类,负责配置自动注册和应用的健康检查器初始化。

读取 eureka.client.*

@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
	EurekaClientConfigBean client = new EurekaClientConfigBean();
  if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {
    // 默认情况下,我们不会在引导过程中注册,但是以后会有另一个机会。
    client.setRegisterWithEureka(false);
  }
  return client;
}
复制代码

EurekaClientConfigBean 封装的是 eureka client 和 eureka server 交互所需要的配置信息,比如前面demo工程中的 eureka.client.service-url.defaultZone 的配置。

读取 eureka.instance.*

@Bean
@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
	ManagementMetadataProvider managementMetadataProvider) {
  // 代码较长,此处省略
}
复制代码

EurekaInstanceConfigBean 封装的是 eureka client 自身实例的配置信息,提供服务注册的基本元数据信息。

核心组件 bean 初始化

这里也实例化了一些核心的组件bean。

ApplicationInfoManager

  • EurekaClientConfiguration#eurekaApplicationInfoManager
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(
EurekaInstanceConfig config) {
  InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
  return new ApplicationInfoManager(config, instanceInfo);
}
复制代码
  • RefreshableEurekaClientConfiguration#eurekaApplicationInfoManager
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
  InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
  return new ApplicationInfoManager(config, instanceInfo);
}
复制代码

RefreshScope ,被此注解标注的情况下,将会被动态刷新。包括属性信息等,注意,对于动态刷新,被RefreshScope标记的类不能是final的。

ApplicationInfoManager 是应用信息管理器,用于管理服务实例的信息类 InstanceInfo 和服务实例的配置信息类 EurekaInstanceConfig 。

DiscoveryClient

@Bean
public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
	return new EurekaDiscoveryClient(config, client);
}
复制代码

DiscoveryClient ,前面说到,这个类是Spring Cloud 中用于服务发现使用的客户端接口。注意这里是SpringCloud提供的接口,不是netflix中的类。

EurekaServiceRegistry

@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {
	return new EurekaServiceRegistry();
}
复制代码

EurekaServiceRegistry 是 ServiceRegistry 的实现类。ServiceRegistry 是 SpringCloud 提供了注册和注销等方法,这些方法允许用户提供自定义注册服务。

EurekaRegistration

@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
	public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, ObjectProvider<HealthCheckHandler> healthCheckHandler) {
		return EurekaRegistration.builder(instanceConfig)
				.with(applicationInfoManager)
				.with(eurekaClient)
				.with(healthCheckHandler)
				.build();
	}
复制代码

每个 ServiceRegistry 实现都有自己的 Registry 实现。

  • ZookeeperRegistration -> ZookeeperServiceRegistry
  • ZookeeperRegistration -> EurekaServiceRegistry
  • ConsulRegistration       -> ConsulServiceRegistry

如果你需要自定义实现 ServiceRegistry ,则也不要提供一个 Registration  的实现。

服务发现

服务发现的基本情况在上面已经提到了,但是由于 SpingCloud 中并没有提供具体的交互操作而是由 com.netflix.discovery.DiscoveryClient 来完成具体工作。所以关于服务服务发现这里就直接围绕这个类来展开。

image.png

LookopService

public interface LookupService<T> {
    // 根据服务实例注册的appName 来获取 Application
    Application getApplication(String appName);
    // 返回当前注册表中所有的服务实例信息
    Applications getApplications();
    // 根据服务实例Id获取服务实例信息
    List<InstanceInfo> getInstancesById(String id);
    /**
     * 获取下一个可能的服务器,以处理来自从eureka接收到的注册表信息的请求。
     * @virtualHostname 与服务器关联的虚拟主机名。
     * @secure 指示是HTTP还是HTTPS请求
     *
     */
    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}
复制代码

LookupService 接口的作用就是用于查找活动服务实例;总共提供了四个方法,很好理解。每个方法的作用见注释。

EurekaClient

EurekaClient 也是一个接口,集成并且扩展了 LookupService。

This interface does NOT try to clean up the current client interface for eureka 1.x. Rather it tries to provide an easier transition path from eureka 1.x to eureka 2.x. 从这来看,EurekaClient 的存在是为了给 Eureka1.x 向 Eureka 2.x 升级提供容错能力。

EurekaClient 在 LookupService 基础上扩展了很多方法,如下:

public interface EurekaClient extends LookupService {
  	// 省去@Deprecated方法和获取服务实例信息的接口方法
		// 注册健康检查处理器
    public void registerHealthCheck(HealthCheckHandler healthCheckHandler);
		// 监听client服务信息的更新
    public void registerEventListener(EurekaEventListener eventListener);
   	// 取消监听
    public boolean unregisterEventListener(EurekaEventListener eventListener);
 		// 获取当前健康检查处理器
    public HealthCheckHandler getHealthCheckHandler();
		// 关闭 eureka 客户端。还向eureka服务器发送撤销注册请求。
    public void shutdown();
  	// EurekaClientConfig
    public EurekaClientConfig getEurekaClientConfig();
 		// ApplicationInfoManager
    public ApplicationInfoManager getApplicationInfoManager();
}
复制代码

HealthCheckHandler 这个是用于检查当前客户端状态的,这个在后面心跳机制里面会说道。

DiscoveryClient

com.netflix.discovery.DiscoveryClient,这个类会在构造函数中完成一系列重要的操作,如:拉取注册表信息,服务注册,初始化心跳机制,缓存刷新,按需注册定时任务等等。

 DiscoveryClient(ApplicationInfoManager applicationInfoManager, 
 								 EurekaClientConfig config, 
                 AbstractDiscoveryClientOptionalArgs args,
                 Provider<BackupRegistry> backupRegistryProvider) {
 // ... 
 }
复制代码

几个参数的释义如下:

  • applicationInfoManager :应用信息管理器
  • config :client 与 server 交互的配置信息
  • args :客户端提供的过滤器类型(支持jersey1和jersey2),后面用来构建 EurekaTransport
  • backupRegistryProvider : 备份注册中心

服务发现

下面代码片段也是在 DiscoveryClient 的构造函数里面的,这里就是拉取注册服务信息的逻辑:

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
	fetchRegistryFromBackup();
}
复制代码

clientConfig.shouldFetchRegistry() 这个方法拿到的就是配置文件中 eureka.client.fetch-registry 的值,默认为true,表示从 eureka server 拉取注册表信息。

fetchRegistry(boolean)是从 eureka server 拉取注册信息的方法,参数用于表示是否是强制拉取全量的注册信息;此方法除非在协调eureka服务器和客户端注册表信息方面存在问题,否则此方法只尝试在第一次进行全量获取,后面均是增量获取。

fetchRegistryFromBackup() 如果 eureka server 服务不可用,则采用的备用方案。

底层通信实现 EurekaTransport

EurekaTransport 是 DiscoveryClient 的内部类,EurekaTransport 封装了具体的基于 jersey 的底层通信实现。

FetchRegistry

image.png
上图为拉取注册信息的整个过程。对于黄色贴条上的条件,如果满足其中一个,则都会进行全量拉取;否则进行增量拉取。计算 hash 值是为了后面可以与server端应用信息的进行对比,用于感知是否需要重新进行拉取操作。

服务注册

服务注册逻辑也是在 DiscoveryClient 的构造函数中完成,代码片段如下:

if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
  try {
    if (!register() ) {
    throw new IllegalStateException("Registration error at startup. Invalid server response.");
  	}
  } catch (Throwable th) {
    logger.error("Registration error at startup: {}", th.getMessage());
    throw new IllegalStateException(th);
  }
}
复制代码

向server端注册需要满足的两个条件是:1、允许向server端注册  2、是否在客户端初始化期间强制注册

boolean register() throws Throwable {
  logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
  EurekaHttpResponse<Void> httpResponse;
  try {
  	httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
  } catch (Exception e) {
    logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
    throw e;
  }
  if (logger.isInfoEnabled()) {
  	logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
  }
  return httpResponse.getStatusCode() == 204;
}
复制代码

通过 eurekaTransport 对象,基于 REST 调用向 eureka server 进行服务注册。

心跳机制

心跳机制的初始化工作也是在 DiscoveryClient 构造函数中完成。在DiscoveryClient构造函数的最后,有一个初始化调度任务的方法,在这个方法里就包括心跳的初始化。

heartbeatExecutor 心跳线程池:

heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
复制代码

scheduler 提交周期执行:

// Heartbeat timer
scheduler.schedule(
                  new TimedSupervisorTask(
                  "heartbeat",
                  scheduler,
                  heartbeatExecutor,
                  renewalIntervalInSecs,
                  TimeUnit.SECONDS,
                  expBackOffBound,
                  new HeartbeatThread()
                  ),
									renewalIntervalInSecs, TimeUnit.SECONDS);
复制代码

TimedSupervisorTask 是 eureka 中自动调节间隔的周期性任务类。HeartbeatThread 是具体执行任何的线程,run方法中执行的就是 renew() 续期。

boolean renew() {
  EurekaHttpResponse<InstanceInfo> httpResponse;
  try {
    // 通过 eurekaTransport 来与 server 通信续期
    httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
    logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
    // 404 标识当前服务实例不存在
    if (httpResponse.getStatusCode() == 404) {
      // 记录心跳次数
      REREGISTER_COUNTER.increment();
      logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
      long timestamp = instanceInfo.setIsDirtyWithTime();
      // 重新注册
      boolean success = register();
      if (success) {
      	instanceInfo.unsetIsDirty(timestamp);
      }
    	return success;
    }
    // 200 状态正常
    return httpResponse.getStatusCode() == 200;
  } catch (Throwable e) {
    logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
    return false;
  }
}
复制代码

服务下线

关闭 eureka client,还向 eureka server 发送撤销注册请求。该方法在DiscoveryClient#shutdown 方法中。

@PreDestroy
    @Override
    public synchronized void shutdown() {
  			// 保证原子操作
        if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");
            if (statusChangeListener != null && applicationInfoManager != null) {
              	// 应用管理器取消状态监听
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }
						// 清理任务调度执行
            cancelScheduledTasks();
            // If APPINFO was registered
            if (applicationInfoManager != null
                    && clientConfig.shouldRegisterWithEureka()
                    && clientConfig.shouldUnregisterOnShutdown()) {
              	//设置服务实例状态为 DOWN
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
              	//注销注册
                unregister();
            }
						// 关闭 jersey 客户端
            if (eurekaTransport != null) {
                eurekaTransport.shutdown();
            }
            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();
            logger.info("Completed shut down of DiscoveryClient");
        }
    }
复制代码
关注下面的标签,发现更多相似文章
评论