Sentinel源码分析(第三篇):StatisticSlot分析

2,749 阅读8分钟

1. 前言

在前两篇中,主要分析了Context、Entry、Node的作用和创建,整个调用链的形成。这篇文章我们开始分析StatisticSlot,StatisticSlot主要负责统计对资源访问的各种指标,包括请求数、请求成功数、请求线程数、响应时间等等。并且分析Sentinel是如何保存这些指标的。

2. StatisticSlot

2.1. StatisticSlot的作用

之前在分析Sentinel的调用链的时候简单讲过StatisticSlot是Sentinel中的一个功能插槽,主要是负责收集统计保存Sentinel中资源的各种指标,并且是根据不同的维度来进行统计。后续的其他功能插槽将会根据StatisticSlot中的统计指标实现自己的功能。

2.2. StatisticSlot工作原理

进入StatisticSlot功能插槽的时候,首先会执行entry()方法,代码如下:

 public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
        boolean prioritized, Object... args) throws Throwable {

        try {
            // Do some checking.
            //先出发下一个slot作用,如果剩下的所有slot都没有抛出异常,代表没有被各种规则限制
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            //线程数加一,defaultNode和clusterNode同时累加。
            node.increaseThreadNum();
            //请求通过数,defaultNode和clusterNode同时累加,窗口时间
            node.addPassRequest(count);

            //如果来源节点不为空,则统计来源节点
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            //entryType为in的,使用一个全局的clusternode来统计
            if (resourceWrapper.getType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected error, set error to current entry.
            context.getCurEntry().setError(e);

            // This should not happen.
            node.increaseExceptionQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseExceptionQps(count);
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseExceptionQps(count);
            }
            throw e;
        }
    }

通过源码,可以看出StatisticSlot的entry()主要工作为:

  • 先调用剩下的功能插槽。
  • 如果剩下的功能插槽正常执行完毕,记录请求线程数、请求通过数。
  • 如果执行剩下的功能插槽出现PriorityWaitException异常,记录请求线程数。
  • 如果执行剩下的功能插槽出现BlockException,记录block请求数,并且给当前的entry设置错误原因。
  • 如果执行剩下的功能插槽出现其他的错误,记录异常数,并且给当前的entry设置错误原因。

在退出StatisticSlot的时候,会调用exit()方法,代码如下:


    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {

        DefaultNode node = (DefaultNode) context.getCurNode();

        //如果没有发生异常
        if (context.getCurEntry().getError() == null) {
            // Calculate response time (max RT is TIME_DROP_VALVE).
            long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
            if (rt > Constants.TIME_DROP_VALVE) {
                rt = Constants.TIME_DROP_VALVE;
            }

            // Record response time and success count.
            node.addRtAndSuccess(rt, count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
            }

            node.decreaseThreadNum();

            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().decreaseThreadNum();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
                Constants.ENTRY_NODE.decreaseThreadNum();
            }
        } else {
            // Error may happen.
        }

        // Handle exit event with registered exit callback handlers.
        Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
        for (ProcessorSlotExitCallback handler : exitCallbacks) {
            handler.onExit(context, resourceWrapper, count, args);
        }

        fireExit(context, resourceWrapper, count);
    }

通过代码,可以看出StatisticSlot主要工作为:

  • 如果没有发生异常,记录当前请求的响应时间和成功数,并且将请求线程数递减。

需要注意的是,在进入或者退出StatisticSlot的时候,会记录不同维度的数据,包括DefaultNode、ClusterNode、源节点和一个用来记录EntryType类型为In的ClusterNode。

2.3. 小结

其实StatisticSlot的工作很简单,请求进入的时候根据是否异常记录不同的信息,退出的时候记录一些信息。具体是怎么记录的,是Node自己控制实现的。

3. 滑动时间窗口

Sentinel在统计qps等指标的时候是采用滑动时间窗口实现的。

StatisticSlot在统计指标的时候调用node.addPassRequest(count)等方法进行指标的累加,依次看下去,最终会在StatisticNode定义两个时间级别的滑动时间窗口进行数据的保存和统计。

  /**
     * 持有秒级别的统计数据指标,默认一秒,样本数2
     */
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    /**
     * 持有分钟级别的数据指标,单位一分钟,样本数量为60,即每秒一个样本
     */
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

3.1. Metric

Metric是一个接口,表示资源的各种指标的基本结构,里面定义了获取和累加各种指标的方法,指标包括请求成功数、请求异常数、请阻塞数、请求数、响应时间。

3.2. ArrayMetric

ArrayMetric实现了Metric接口,实现了里面所有的方法。

public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
    
    ...
}

上面是ArrayMetric的定义,ArrayMetric中用一个LeapArray来保存统计的指标信息,LeapArray是一个抽象类,默认的实现是OccupiableBucketLeapArray,但可以根据构造器指定为BucketLeapArray。至于LeapArray接下来再讲。

我们以请求成功数来看源码是怎么实现的:

 @Override
    public long success() {
        data.currentWindow();
        long success = 0;

        List<MetricBucket> list = data.values();
        for (MetricBucket window : list) {
            success += window.success();
        }
        return success;
    }

可以看到,Sentinel先去获取当前的时间窗口,然后获取所有的

3.3. LeapArray

LeapArray是保存时间窗口的一个数据结构,定义如下:

    /**
     * 单个滑动时间窗口的大小
     */
    protected int windowLengthInMs;

    /**
     *总共的窗口数
     */
    protected int sampleCount;

    /**
     * 总的时间大小
     */
    protected int intervalInMs;

    /**
     * 采样的时间窗口数组
     */
    protected final AtomicReferenceArray<WindowWrap<T>> array;

    /**
     * The conditional (predicate) update lock is used only when current bucket is deprecated.
     */
    private final ReentrantLock updateLock = new ReentrantLock();

LeapArray中用一个AtomicReferenceArray数组来保存每个时间窗口内的数据,每个时间窗口的大小为windowLengthInMs,总共的时间区间为intervalInMs,所以总共的窗口数量为intervalInMs/windowLengthInMs。

LeapArray中用一个WindowWrap类型的数组,WindowWrap表示每一个时间窗口的数据,包括时间窗口的大小、开始时间、统计的数据。

public class WindowWrap<T> {

    /**以毫秒为单位的时间窗口长度
     * Time length of a single window bucket in milliseconds.
     */
    private final long windowLengthInMs;

    /**
     * 开始时间
     * Start timestamp of the window in milliseconds.
     */
    private long windowStart;

    /**
     * 统计的数据
     * Statistic data.
     */
    private T value;

}

LeapArray中有一个方法非常的重要,currentWindow(),这个方法会根据传入的时间获取具体的时间窗口对象,也就是WindowWrap,如果没有对应的时间的时间窗口,则新建一个。后续记录请求成功数、异常数等,都是通过这个对象来进行操作的。

 public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        int idx = calculateTimeIdx(timeMillis);
        long windowStart = calculateWindowStart(timeMillis);
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    return window;
                } else {
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                return old;
            } else if (windowStart > old.windowStart()) {
                if (updateLock.tryLock()) {
                    try {
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }

以上就是获取指定时间的时间窗口对象的方法,主要逻辑如下:

  • 根据传入的时间戳计算对应的数组下标,获取对应的WindowWrap。
  • 根据传入的时间戳计算对应的窗口开始时间。
  • 如果对应的WindowWrap不存在,新建一个WindowWrap。
  • 如果对应的WindowWrap存在,并且WindowWrap的开始时间和计算出来的开始时间一致,则直接返回。
  • 如果对应的WindowWrap存在,并且WindowWrap的开始时间小于计算出来的开始时间,则重置该时间窗口。

需要注意的是,currentWindow()方法中调用的newEmptyBucket()和resetWindowTo()方法是一个抽象的方法,具体的方法是在其子类中实现。LeapArray的子类比较多,但是从ArrayMetric中可疑看出是使用OccupiableBucketLeapArray作为实现类,所以这两个方法的具体实现是在OccupiableBucketLeapArray中。

总体来说LeapArray是保存一个时间区间内的数据,每一段时间区间内的数据具体用WindowWrap来保存。LeapArray中有一个方法currentWindow(),这个方法是用来获取或者创建特定时间的时间窗口结构的。后续数据的统计操作就是根据该时间窗口来进行的。

3.3. MetricBucket

LeapArray是一个泛型类型的对象,在ArrayMetric中新建LeapArrat的时候,传入具体的类型为MetricBucket,也就是说WindowWrap中的data是MetricBucket类型的。MetricBucket才是真正记录各种指标的数据结构。

public class MetricBucket {

    private final LongAdder[] counters;

    private volatile long minRt;

    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }
    
    /**
     * Reset the adders.
     *
     * @return new metric bucket in initial state
     */
    public MetricBucket reset() {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
        }
        initMinRt();
        return this;
    }

    public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }

    public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
    }

    public long pass() {
        return get(MetricEvent.PASS);
    }
 ...
}

MertricBucket使用一个LongAdder数组来保存各种指标,看MertricBucket的构造函数可看出,数组是在这个时候初始化的,数组的每一个元素代表一个指标,并且顺序是固定的,下标是根据MetricEvent中各个枚举的定义顺序来指定的。不管是获取还是修改指标,都是操作这个数组中对应的元素来进行的。

public enum MetricEvent {
    PASS,
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,
    OCCUPIED_PASS
}

4. 小结

本篇文章主要是分析一下StatisticSlot具体的工作流程,以及Sentinel使用滑动时间窗口进行数据的保存。统计数据的主要流程大致如下:

在前三篇文章中,我们分析了Sentinel主要的工作流程,调用链路,还分析了Sentinel统计数据指标的维度原理和数据保存原理。接下来将会分析Sentinel的其他功能插槽,比如限流和降级等功能是如何实现的。