从HystrixRequestContext理解线程间上下文传递

1,574 阅读3分钟

前言

在学习Hystrix的请求缓存与请求合并过程中,不禁产生疑问,如何实现基于一个类似于“ThreadLocal变量”,但上下文运用范围为Request维度,也就是“HystrixRequestContext”。

产生这个疑问的原因

Contains the state and manages the lifecycle of {@link HystrixRequestVariableDefault} objects that provide request scoped (rather than only thread scoped) variables so that multiple threads within a single request can share state.

以请求合并(REQUEST维度)为例

  • 前文中,提到请求合并是通过HystrixCollapser来实现的,执行合并请求通过使用HystrixContextCallable来实现。
  • 源码中,CollapsedTask构造器中一段注释,即运行过程中,使用上一级线程的上下文,例如Tomcat线程
// this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread)
// HystrixContextRunnable是个Runnable,一个可用于执行的任务
public class HystrixContextRunnable implements Runnable {

    private final Callable<Void> actual;
    private final HystrixRequestContext parentThreadState;

    public HystrixContextRunnable(Runnable actual) {
        this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
    }
    
    public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
        // 获取当前线程的HystrixRequestContext
        this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
    }

    // 关键的构造器
    public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, final HystrixRequestContext hystrixRequestContext, final Runnable actual) {
        
        // 将原始Callable装饰, 创建了一个新的callable
        this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                actual.run();
                return null;
            }
        });
        // 存储当前线程的hystrixRequestContext
        this.parentThreadState = hystrixRequestContext;
    }

    @Override
    public void run() {
        // 运行实际的Runnable之前先保存当前线程已有的HystrixRequestContext
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            // 设置当前线程的HystrixRequestContext,来自上一级线程,因此两个线程是同一个HystrixRequestContext
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            try {
                actual.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            // 还原当前线程的HystrixRequestContext
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }
}
  • 从此可以获知,在子线程执行具体代码前,将当前线程的HystrixRequestContext替换为了上一级线程的HystrixRequestContext,执行后还原,实现了HystrixRequestContext的传递。

简化DEMO

DemoRequestContext

public class DemoRequestContext {

    // ThreadLoacal
    private static ThreadLocal<DemoRequestContext> demo = new ThreadLocal<>();

    // 线程上下文,存储map
    private ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();

    private DemoRequestContext() {
    }

    // 初始化上下文
    public static DemoRequestContext initializeContext() {
        DemoRequestContext context = new DemoRequestContext();
        demo.set(context);
        return context;
    }

    // 设置当前线程的上下文
    public static void setContextOnCurrentThread(DemoRequestContext state) {
        demo.set(state);
    }

    // 获取当前线程的上下文
    public static DemoRequestContext getContextForCurrentThread() {
        DemoRequestContext context = demo.get();

        if (context != null && context.concurrentHashMap != null) {
            return context;
        } else {
            return null;
        }
    }

    // 存储当前线程需要存储的数据 key-value
    public void set(String key, String value) {
        if (DemoRequestContext.getContextForCurrentThread() == null) {
            throw new IllegalArgumentException(DemoRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
        }
        DemoRequestContext.getContextForCurrentThread().concurrentHashMap.put(key, value);
    }

    // 根据key获取当前线程上下文中的值
    public String get(String key) {
        if (DemoRequestContext.getContextForCurrentThread() == null) {
            throw new IllegalArgumentException(DemoRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
        }
        ConcurrentHashMap<String, String> variableMap = DemoRequestContext.getContextForCurrentThread().concurrentHashMap;
        return variableMap.get(key);
    }

}

DemoContextCallable

public class DemoContextCallable<V> implements Callable<V> {

    private final Callable<V> callable;
    private final DemoRequestContext parentThreadState;

    // 构造时 将上一级线程上下文注入
    public DemoContextCallable(Callable<V> callable) {
        this.callable = callable;
        this.parentThreadState = DemoRequestContext.getContextForCurrentThread();
    }

    // 替换线程上下文的操作
    @Override
    public V call() throws Exception {
        DemoRequestContext context = DemoRequestContext.getContextForCurrentThread();
        try {
            DemoRequestContext.setContextOnCurrentThread(parentThreadState);
            return callable.call();
        } finally {
            DemoRequestContext.setContextOnCurrentThread(context);
        }
    }
}

测试类

public class ContextTest {

    private static final ExecutorService executorService = Executors.newCachedThreadPool();
    public static void main(String[] args) {
        // 主线程的context
        DemoRequestContext.initializeContext();

        // 主线程存储的key-value
        DemoRequestContext context = DemoRequestContext.getContextForCurrentThread();
        context.set("name", "parentThread");

        DemoContextCallable<String> contextCallable = new DemoContextCallable<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                // 子线程中取出上下文内容
                return DemoRequestContext.getContextForCurrentThread().get("name");
            }
        });

        // 线程池运行
        List<Future<String>> list = Lists.newArrayList();
        for (int i = 0; i < 3; i++) {
            Future<String> future= executorService.submit(contextCallable);
            list.add(future);
        }

        for (Future<String> future : list) {
            try {
                System.out.println(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

    }
}

输出

parentThread
parentThread
parentThread

总结

一直对HystrixRequestContext的实现原理很困惑,官方推荐是在Filter中初始化并shutdown,因此有以下理解。

HystrixRequestContext是一次HttpRequest中的上下文,一次请求中可能有有多个线程执行多个CommandHystrixRequestCacheHystrixRequestContext生命周期一致,都是一次HttpRequest

参考文献