前言
在学习Hystrix
的请求缓存与请求合并过程中,不禁产生疑问,如何实现基于一个类似于“ThreadLocal
变量”,但上下文运用范围为Request
维度,也就是“HystrixRequestContext
”。
产生这个疑问的原因
-
一个
Request
,即Command
可以由多个线程执行,举个例子,我们使用线程A和线程B处理TestCommand
,应该属于两次TestCommand
执行,A线程和B线程是怎么共用的缓存,减少了额外的逻辑执行? -
结合Hystrix请求合并与请求缓存(一):请求缓存、Hystrix请求合并与请求缓存(二):请求合并 中的分析,
HystrixRequestContext
怎么如注释所说,实现request scoped
?
Contains the state and manages the lifecycle of {@link HystrixRequestVariableDefault} objects that provide request scoped (rather than only thread scoped) variables so that multiple threads within a single request can share state.
以请求合并(REQUEST维度)为例
- 前文中,提到请求合并是通过
HystrixCollapser
来实现的,执行合并请求通过使用HystrixContextCallable
来实现。 - 源码中,
CollapsedTask
构造器中一段注释,即运行过程中,使用上一级线程的上下文,例如Tomcat线程。
// this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread)
// HystrixContextRunnable是个Runnable,一个可用于执行的任务
public class HystrixContextRunnable implements Runnable {
private final Callable<Void> actual;
private final HystrixRequestContext parentThreadState;
public HystrixContextRunnable(Runnable actual) {
this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
}
public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
// 获取当前线程的HystrixRequestContext
this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
}
// 关键的构造器
public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, final HystrixRequestContext hystrixRequestContext, final Runnable actual) {
// 将原始Callable装饰, 创建了一个新的callable
this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
actual.run();
return null;
}
});
// 存储当前线程的hystrixRequestContext
this.parentThreadState = hystrixRequestContext;
}
@Override
public void run() {
// 运行实际的Runnable之前先保存当前线程已有的HystrixRequestContext
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// 设置当前线程的HystrixRequestContext,来自上一级线程,因此两个线程是同一个HystrixRequestContext
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
try {
actual.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
} finally {
// 还原当前线程的HystrixRequestContext
HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}
}
- 从此可以获知,在子线程执行具体代码前,将当前线程的
HystrixRequestContext
替换为了上一级线程的HystrixRequestContext
,执行后还原,实现了HystrixRequestContext
的传递。
简化DEMO
DemoRequestContext
public class DemoRequestContext {
// ThreadLoacal
private static ThreadLocal<DemoRequestContext> demo = new ThreadLocal<>();
// 线程上下文,存储map
private ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();
private DemoRequestContext() {
}
// 初始化上下文
public static DemoRequestContext initializeContext() {
DemoRequestContext context = new DemoRequestContext();
demo.set(context);
return context;
}
// 设置当前线程的上下文
public static void setContextOnCurrentThread(DemoRequestContext state) {
demo.set(state);
}
// 获取当前线程的上下文
public static DemoRequestContext getContextForCurrentThread() {
DemoRequestContext context = demo.get();
if (context != null && context.concurrentHashMap != null) {
return context;
} else {
return null;
}
}
// 存储当前线程需要存储的数据 key-value
public void set(String key, String value) {
if (DemoRequestContext.getContextForCurrentThread() == null) {
throw new IllegalArgumentException(DemoRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
}
DemoRequestContext.getContextForCurrentThread().concurrentHashMap.put(key, value);
}
// 根据key获取当前线程上下文中的值
public String get(String key) {
if (DemoRequestContext.getContextForCurrentThread() == null) {
throw new IllegalArgumentException(DemoRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
}
ConcurrentHashMap<String, String> variableMap = DemoRequestContext.getContextForCurrentThread().concurrentHashMap;
return variableMap.get(key);
}
}
DemoContextCallable
public class DemoContextCallable<V> implements Callable<V> {
private final Callable<V> callable;
private final DemoRequestContext parentThreadState;
// 构造时 将上一级线程上下文注入
public DemoContextCallable(Callable<V> callable) {
this.callable = callable;
this.parentThreadState = DemoRequestContext.getContextForCurrentThread();
}
// 替换线程上下文的操作
@Override
public V call() throws Exception {
DemoRequestContext context = DemoRequestContext.getContextForCurrentThread();
try {
DemoRequestContext.setContextOnCurrentThread(parentThreadState);
return callable.call();
} finally {
DemoRequestContext.setContextOnCurrentThread(context);
}
}
}
测试类
public class ContextTest {
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) {
// 主线程的context
DemoRequestContext.initializeContext();
// 主线程存储的key-value
DemoRequestContext context = DemoRequestContext.getContextForCurrentThread();
context.set("name", "parentThread");
DemoContextCallable<String> contextCallable = new DemoContextCallable<String>(new Callable<String>() {
@Override
public String call() throws Exception {
// 子线程中取出上下文内容
return DemoRequestContext.getContextForCurrentThread().get("name");
}
});
// 线程池运行
List<Future<String>> list = Lists.newArrayList();
for (int i = 0; i < 3; i++) {
Future<String> future= executorService.submit(contextCallable);
list.add(future);
}
for (Future<String> future : list) {
try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
}
输出
parentThread
parentThread
parentThread
总结
一直对HystrixRequestContext
的实现原理很困惑,官方推荐是在Filter
中初始化并shutdown
,因此有以下理解。
HystrixRequestContext
是一次HttpRequest
中的上下文,一次请求中可能有有多个线程执行多个Command
。HystrixRequestCache
和HystrixRequestContext
生命周期一致,都是一次HttpRequest
。