1. 前言
在前两篇中,主要分析了Context、Entry、Node的作用和创建,整个调用链的形成。这篇文章我们开始分析StatisticSlot,StatisticSlot主要负责统计对资源访问的各种指标,包括请求数、请求成功数、请求线程数、响应时间等等。并且分析Sentinel是如何保存这些指标的。
2. StatisticSlot
2.1. StatisticSlot的作用
之前在分析Sentinel的调用链的时候简单讲过StatisticSlot是Sentinel中的一个功能插槽,主要是负责收集统计保存Sentinel中资源的各种指标,并且是根据不同的维度来进行统计。后续的其他功能插槽将会根据StatisticSlot中的统计指标实现自己的功能。
2.2. StatisticSlot工作原理
进入StatisticSlot功能插槽的时候,首先会执行entry()方法,代码如下:
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
//先出发下一个slot作用,如果剩下的所有slot都没有抛出异常,代表没有被各种规则限制
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
//线程数加一,defaultNode和clusterNode同时累加。
node.increaseThreadNum();
//请求通过数,defaultNode和clusterNode同时累加,窗口时间
node.addPassRequest(count);
//如果来源节点不为空,则统计来源节点
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
//entryType为in的,使用一个全局的clusternode来统计
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected error, set error to current entry.
context.getCurEntry().setError(e);
// This should not happen.
node.increaseExceptionQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps(count);
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps(count);
}
throw e;
}
}
通过源码,可以看出StatisticSlot的entry()主要工作为:
- 先调用剩下的功能插槽。
- 如果剩下的功能插槽正常执行完毕,记录请求线程数、请求通过数。
- 如果执行剩下的功能插槽出现PriorityWaitException异常,记录请求线程数。
- 如果执行剩下的功能插槽出现BlockException,记录block请求数,并且给当前的entry设置错误原因。
- 如果执行剩下的功能插槽出现其他的错误,记录异常数,并且给当前的entry设置错误原因。
在退出StatisticSlot的时候,会调用exit()方法,代码如下:
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode) context.getCurNode();
//如果没有发生异常
if (context.getCurEntry().getError() == null) {
// Calculate response time (max RT is TIME_DROP_VALVE).
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
// Record response time and success count.
node.addRtAndSuccess(rt, count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
}
node.decreaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// Error may happen.
}
// Handle exit event with registered exit callback handlers.
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}
fireExit(context, resourceWrapper, count);
}
通过代码,可以看出StatisticSlot主要工作为:
- 如果没有发生异常,记录当前请求的响应时间和成功数,并且将请求线程数递减。
需要注意的是,在进入或者退出StatisticSlot的时候,会记录不同维度的数据,包括DefaultNode、ClusterNode、源节点和一个用来记录EntryType类型为In的ClusterNode。
2.3. 小结
其实StatisticSlot的工作很简单,请求进入的时候根据是否异常记录不同的信息,退出的时候记录一些信息。具体是怎么记录的,是Node自己控制实现的。
3. 滑动时间窗口
Sentinel在统计qps等指标的时候是采用滑动时间窗口实现的。
StatisticSlot在统计指标的时候调用node.addPassRequest(count)等方法进行指标的累加,依次看下去,最终会在StatisticNode定义两个时间级别的滑动时间窗口进行数据的保存和统计。
/**
* 持有秒级别的统计数据指标,默认一秒,样本数2
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
/**
* 持有分钟级别的数据指标,单位一分钟,样本数量为60,即每秒一个样本
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
3.1. Metric
Metric是一个接口,表示资源的各种指标的基本结构,里面定义了获取和累加各种指标的方法,指标包括请求成功数、请求异常数、请阻塞数、请求数、响应时间。
3.2. ArrayMetric
ArrayMetric实现了Metric接口,实现了里面所有的方法。
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
...
}
上面是ArrayMetric的定义,ArrayMetric中用一个LeapArray来保存统计的指标信息,LeapArray是一个抽象类,默认的实现是OccupiableBucketLeapArray,但可以根据构造器指定为BucketLeapArray。至于LeapArray接下来再讲。
我们以请求成功数来看源码是怎么实现的:
@Override
public long success() {
data.currentWindow();
long success = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
success += window.success();
}
return success;
}
可以看到,Sentinel先去获取当前的时间窗口,然后获取所有的
3.3. LeapArray
LeapArray是保存时间窗口的一个数据结构,定义如下:
/**
* 单个滑动时间窗口的大小
*/
protected int windowLengthInMs;
/**
*总共的窗口数
*/
protected int sampleCount;
/**
* 总的时间大小
*/
protected int intervalInMs;
/**
* 采样的时间窗口数组
*/
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
*/
private final ReentrantLock updateLock = new ReentrantLock();
LeapArray中用一个AtomicReferenceArray数组来保存每个时间窗口内的数据,每个时间窗口的大小为windowLengthInMs,总共的时间区间为intervalInMs,所以总共的窗口数量为intervalInMs/windowLengthInMs。
LeapArray中用一个WindowWrap类型的数组,WindowWrap表示每一个时间窗口的数据,包括时间窗口的大小、开始时间、统计的数据。
public class WindowWrap<T> {
/**以毫秒为单位的时间窗口长度
* Time length of a single window bucket in milliseconds.
*/
private final long windowLengthInMs;
/**
* 开始时间
* Start timestamp of the window in milliseconds.
*/
private long windowStart;
/**
* 统计的数据
* Statistic data.
*/
private T value;
}
LeapArray中有一个方法非常的重要,currentWindow(),这个方法会根据传入的时间获取具体的时间窗口对象,也就是WindowWrap,如果没有对应的时间的时间窗口,则新建一个。后续记录请求成功数、异常数等,都是通过这个对象来进行操作的。
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
以上就是获取指定时间的时间窗口对象的方法,主要逻辑如下:
- 根据传入的时间戳计算对应的数组下标,获取对应的WindowWrap。
- 根据传入的时间戳计算对应的窗口开始时间。
- 如果对应的WindowWrap不存在,新建一个WindowWrap。
- 如果对应的WindowWrap存在,并且WindowWrap的开始时间和计算出来的开始时间一致,则直接返回。
- 如果对应的WindowWrap存在,并且WindowWrap的开始时间小于计算出来的开始时间,则重置该时间窗口。
需要注意的是,currentWindow()方法中调用的newEmptyBucket()和resetWindowTo()方法是一个抽象的方法,具体的方法是在其子类中实现。LeapArray的子类比较多,但是从ArrayMetric中可疑看出是使用OccupiableBucketLeapArray作为实现类,所以这两个方法的具体实现是在OccupiableBucketLeapArray中。
总体来说LeapArray是保存一个时间区间内的数据,每一段时间区间内的数据具体用WindowWrap来保存。LeapArray中有一个方法currentWindow(),这个方法是用来获取或者创建特定时间的时间窗口结构的。后续数据的统计操作就是根据该时间窗口来进行的。
3.3. MetricBucket
LeapArray是一个泛型类型的对象,在ArrayMetric中新建LeapArrat的时候,传入具体的类型为MetricBucket,也就是说WindowWrap中的data是MetricBucket类型的。MetricBucket才是真正记录各种指标的数据结构。
public class MetricBucket {
private final LongAdder[] counters;
private volatile long minRt;
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
/**
* Reset the adders.
*
* @return new metric bucket in initial state
*/
public MetricBucket reset() {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
}
initMinRt();
return this;
}
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
public long pass() {
return get(MetricEvent.PASS);
}
...
}
MertricBucket使用一个LongAdder数组来保存各种指标,看MertricBucket的构造函数可看出,数组是在这个时候初始化的,数组的每一个元素代表一个指标,并且顺序是固定的,下标是根据MetricEvent中各个枚举的定义顺序来指定的。不管是获取还是修改指标,都是操作这个数组中对应的元素来进行的。
public enum MetricEvent {
PASS,
BLOCK,
EXCEPTION,
SUCCESS,
RT,
OCCUPIED_PASS
}
4. 小结
本篇文章主要是分析一下StatisticSlot具体的工作流程,以及Sentinel使用滑动时间窗口进行数据的保存。统计数据的主要流程大致如下:
在前三篇文章中,我们分析了Sentinel主要的工作流程,调用链路,还分析了Sentinel统计数据指标的维度原理和数据保存原理。接下来将会分析Sentinel的其他功能插槽,比如限流和降级等功能是如何实现的。