阅读 71

spring cloud 打通全链路上下文 -- 微服务治理的限界上下文

领域驱动设计在微服务中的作用,主要体现在限界上下文中。不同的限界上下文都存在相同的实体领域模型。在微服务拆分时,需要保证各个应用边界的划分,比如一个外卖系统的订单业务与门店、支付的关联性。这就是一次请求所经过的一个链路。其次面向应用的实时性,我们也应该从业务的角度对每个实体进行领域建模。

每个服务主要承担自己独立的业务模型和功能, 如何将请求串联等全链路的应用问题越来越多, 常见的场景问题如下: github 地址:github.com/complone/sp…

  • 网关保存了用户的session和用户信息, 如何让后端无状态的服务知道是哪一个用户操作?
  • 如何通过网关的请求后,到所有的服务能知道是同一条链路
  • 如何从网关或者某个服务开始, 将某个特征数据一直传递下去

链路上下文定义

spring cloud体系下的许多组件已经有自己的上下文实现, , 这里我们仿照zuul的RequestContext定义链路上下文, 主要继承自ConcurrentHashMap, 可以安全存储任何数据, 并采用InheritableThreadLocal保存上下文信息.

public class ChainContextHolder extends ConcurrentHashMap<String, Object> {
 
    protected static Class<? extends ChainContextHolder> contextClass = ChainContextHolder.class;
 
    protected static final ThreadLocal<? extends ChainContextHolder> THREAD_LOCAL = new InheritableThreadLocal<ChainContextHolder>() {
        @Override
        protected ChainContextHolder initialValue() {
            try {
                return contextClass.newInstance();
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
        }
    };
 
    /**
     * Override the default ChainRequestContext
     *
     * @param clazz
     */
    public static void setContextClass(Class<? extends ChainContextHolder> clazz) {
        contextClass = clazz;
    }
 
    /**
     * Get the current ChainRequestContext
     * 
     * @return the current ChainRequestContext
     */
    public static final ChainContextHolder getCurrentContext() {
        return THREAD_LOCAL.get();
    }
 
    /**
     * Unsets the threadLocal context. Done at the end of the request.
     * 
     * @return
     */
    public void unset() {
        this.clear();
        THREAD_LOCAL.remove();
    }
 
    /**
     * Returns either passed value of the key, or if the value is {@code null}, the value of {@code defaultValue}.
     * 
     * @param key
     * @param defaultValue
     * @return
     */
    public Object getDefault(String key, Object defaultValue) {
        return Optional.ofNullable(get(key)).orElse(defaultValue);
    }
 
}
复制代码

SpringMvc接收数据并写入链路上下文

有了链路上下文, 我们首先需要解决如何将数据写入链路上下文, 当然我们可以手动添加数据到链路上下文. 为了使得更加实用, 我们先定义Header中哪些数据能够自动解析到上下文中, 我们定义两种方式, 一种指定前缀标识, 如X-CHAIN-开头的, 另一种就是现实配置具体哪些key:

@ConfigurationProperties(prefix = "spring.chaincontext")
public class ChainContextProperties {

   /**
    * Keys to set in chaincontext, all keys with keyPrefix will add to chaincontext while keys is empty
    */
   private List<String> keys;

   /**
    * The prefix of all keys
    */
   private String keyPrefix = "X-CHAIN-";

   public List<String> getKeys() {
       return keys;
   }

   public void setKeys(List<String> keys) {
       this.keys = keys;
   }

   public String getKeyPrefix() {
       return keyPrefix;
   }

   public void setKeyPrefix(String keyPrefix) {
       this.keyPrefix = keyPrefix;
   }
   
}
复制代码

spring mvc通过拦截器可以轻松实现header转上下文:

public class ChainContexthandlerInterceptor implements HandlerInterceptor {
 
    private ChainContextProperties chainContextProperties;
 
    private IRequestKeyParser requestKeyParser;
 
    public ChainContexthandlerInterceptor(ChainContextProperties chainContextProperties, IRequestKeyParser requestKeyParser) {
        this.chainContextProperties = chainContextProperties;
        this.requestKeyParser = requestKeyParser;
    }
 
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        if (CollectionUtils.isEmpty(chainContextProperties.getKeys())
                && StringUtils.isEmpty(chainContextProperties.getKeyPrefix())) {
            // No chain context rule config
            return true;
        } else if (CollectionUtils.isEmpty(chainContextProperties.getKeys())) {
            // All keys start with keyprefix will set to chain context
            Enumeration<String> headerEnumeration = request.getHeaderNames();
            while (headerEnumeration.hasMoreElements()) {
                String header = headerEnumeration.nextElement().toUpperCase();
                if (header.startsWith(chainContextProperties.getKeyPrefix())) {
                    String value = requestKeyParser.parse(header, request);
                    if (value != null) {
                        // Add key-value to current context
                        ChainContextHolder.getCurrentContext().put(header, value);
                    }
                }
            }
        } else {
            chainContextProperties.getKeys().forEach(key -> {
                String header = chainContextProperties.getKeyPrefix() + key;
                String value = requestKeyParser.parse(header, request);
                if (value != null) {
                    // Add key-value to current context
                    ChainContextHolder.getCurrentContext().put(header, value);
                }
            });
        }
        return true;
    }
 
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
            throws Exception {
        if (ChainContextHolder.getCurrentContext() != null) {
            ChainContextHolder.getCurrentContext().unset();
        }
    }
 
}
复制代码

代码中IRequestKeyParser主要用于指定如何从header中解析数据, 默认的实现如下:

public class DefaultRequestKeyParser implements IRequestKeyParser {
 
    @Override
    public String parse(String key, HttpServletRequest request) {
        return Optional.ofNullable(request.getHeader(key)).orElse(request.getParameter(key));
    }
 
}
复制代码

RestTemplate链路上下文传递

resttemplate传递上下文主要通过拦截器实现, 在调用http请求之前将链路上下文内容写入header中.

public class ChainContextHttpRequestInterceptor implements ClientHttpRequestInterceptor {
 
    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
            throws IOException {
        if (ChainContextHolder.getCurrentContext() != null) {
            ChainContextHolder.getCurrentContext().forEach((key, value) -> {
                request.getHeaders().add(key, value.toString());
            });
        }
        return execution.execute(request, body);
    }
 
}
复制代码

Feign链路上下文传递

feign也支持自己特有的拦截器做发送前逻辑处理, 如:

public class ChainContextRequestInterceptor implements RequestInterceptor {
 
    @Override
    public void apply(RequestTemplate template) {
        if (ChainContextHolder.getCurrentContext() != null) {
            ChainContextHolder.getCurrentContext().forEach((key, value) -> {
                template.header(key, value.toString());
            });
        }
    }
 
}
复制代码

Hystrix连接池传递

之前在链路上下文我们通过InheritableThreadLocal定义传递载体, 能保证当前线程以及当前线程创建的子线程能够正常传递. Hystrix通过复用连接池的方式, 导致后续线程的上下文不能正常传递, hystrix提供了HystrixConcurrencyStrategy供使用者进行扩充, 达到上下文传递效果

要使用HystrixConcurrencyStrategy我们需要先实现Callable接口, 主要是将传递进来的参数写入当前链路上下文:

public final class DelegatingChainContextCallable<V> implements Callable<V>{
 
    private final Callable<V> delegate;
    
    private Map<String, Object> chaincontextAttributes;
    
    public DelegatingChainContextCallable(Callable<V> delegate, Map<String, Object> chaincontextAttributes) {
        this.delegate = delegate;
        this.chaincontextAttributes = chaincontextAttributes;
    }
 
    @Override
    public V call() throws Exception {
        try {
            ChainContextHolder.getCurrentContext().putAll(chaincontextAttributes);
            return delegate.call();
        } finally {
            ChainContextHolder.getCurrentContext().unset();
        }
        
    }
    
}
复制代码

接着我们通过继承HystrixConcurrencyStrategy将调用线程的上下文数据传递到callable中:

public class ChainContextConcurrencyStrategy extends HystrixConcurrencyStrategy {
 
    private HystrixConcurrencyStrategy existingConcurrencyStrategy;
 
    public ChainContextConcurrencyStrategy(HystrixConcurrencyStrategy existingConcurrencyStrategy) {
        this.existingConcurrencyStrategy = existingConcurrencyStrategy;
    }
 
    @Override
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        return existingConcurrencyStrategy != null ? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize)
                : super.getBlockingQueue(maxQueueSize);
    }
 
    @Override
    public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
        return existingConcurrencyStrategy != null ? existingConcurrencyStrategy.getRequestVariable(rv)
                : super.getRequestVariable(rv);
    }
 
    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,
            HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        return existingConcurrencyStrategy != null
                ? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit,
                        workQueue)
                : super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
 
    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
            HystrixThreadPoolProperties threadPoolProperties) {
        return existingConcurrencyStrategy != null
                ? existingConcurrencyStrategy.getThreadPool(threadPoolKey, threadPoolProperties)
                : super.getThreadPool(threadPoolKey, threadPoolProperties);
    }
 
    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return existingConcurrencyStrategy != null
                ? existingConcurrencyStrategy
                        .wrapCallable(new DelegatingChainContextCallable<T>(callable, ChainContextHolder.getCurrentContext()))
                : super.wrapCallable(new DelegatingChainContextCallable<T>(callable, ChainContextHolder.getCurrentContext()));
    }
 
}
复制代码

为了保证其他中间件在集成hystrix的策略也可以用, 需要将已有的策略代理进来:

@Configuration
@ConditionalOnClass({ Hystrix.class })
public class HystrixChainContextConfiguration {
 
    @Autowired(required = false)
    private HystrixConcurrencyStrategy existingConcurrencyStrategy;
 
    @PostConstruct
    public void init() {
        // Keeps references of existing Hystrix plugins.
        HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
        HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
        HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
        HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
 
        HystrixPlugins.reset();
 
        // Registers existing plugins excepts the Concurrent Strategy plugin.
        HystrixPlugins.getInstance()
                .registerConcurrencyStrategy(new ChainContextConcurrencyStrategy(existingConcurrencyStrategy));
        HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
        HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
        HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
    }
 
}
复制代码
关注下面的标签,发现更多相似文章
评论