sentinel简介
sentinel是alibaba开源的流控组件,以流量为核心,提供系统流量控制,熔断降级,系统保护等功能,保证系统的平稳运行。目前主要的流量控制手段主要有两种,一种是以Hystrix为代表,基于线程池隔离的方式,另一种则是通过信号量的方式,sentinel就是此方式来实现流控的。
使用介绍
本文不会对其详细的使用深入介绍,具体的可以参考 github
简单使用步骤:
1.引入依赖
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.7.2</version>
</dependency>
2.在代码中定义资源和规则
资源:在项目里的一段代码,一个方法等一切东西
规则:你要对资源定义的流控规则,如qps等指标
public static void main(String[] args) {
initFlowRules();
while (true) {
Entry entry = null;
try {
entry = SphU.entry("HelloWorld");
/*您的业务逻辑 - 开始*/
System.out.println("hello world");
/*您的业务逻辑 - 结束*/
} catch (BlockException e1) {
/*流控逻辑处理 - 开始*/
System.out.println("block!");
/*流控逻辑处理 - 结束*/
} finally {
if (entry != null) {
entry.exit();
}
}
}
private static void initFlowRules(){
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("HelloWorld");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 20.
rule.setCount(20);
rules.add(rule);
FlowRuleManager.loadRules(rules);
}
}
上述代码中业务代码被定义名为"HelloWorld"的资源,该资源的qps为20。
原理分析
从上面的代码可以看出,限流的入口是从一个叫Entry的对象开始的,Entry顾名思义可以理解为通行入口的意思。通过入口来把我们的代码即资源保护起来,任何请求到来时都要先经过入口的检查,如果检查通过了才能放行,否则的话拒绝放行。而通行规则则有开发人员自己来定。下图来源于sentinel官方,此图大概描述了sentinel运行的原理图。对于初次接触sentinel的人来说看到此图可能会比较懵逼,这里先简单说下大概的工作流程
当我们有请求到来时,Entry会为每一个资源创建一个处理链ProcessorSlotChain,系统默认提供了8个Handler构成了此处理链,每个handler各司其职完成相应的功能,当然我们的流量校验处理对象也在其中名为FlowSlot,如果请求到来时能够通过ProcessorSlotChain的校验的话,就放行此请求,如果不通过,就会抛出相应的异常。Sentinel是以流量控制为核心,底层的流量统计是以滑动窗口来完成qps统计的,具体实现是通过名为LeapArray的对象来完成的。处理链如下图所示:
核心概念和类
在分析具体的源码之前,先介绍几个比较核心的概念和对象,不然进入代码会比较生涩。
Entry:前面说过了,对于要保护的资源须用Entry包裹起来即:
Entry entry = SphU.entry("HelloWorld");
......//保护的资源
entry.exit();
Context:上下文,每个entry都是存在特定上下文中,它是一个ThreadLocal变量,其内部通过name字段来区分不同的上下文,一个上下文即代表一个EntranceNode;
ResourceWrapper:资源包装类,上面SphU.entry("HelloWorld")语句实际上创建了一个HelloWorld的ResourceWrapper,资源具有全局唯一性
public abstract class ResourceWrapper {
protected final String name; //资源名
......
}
ProcessorSlotChain: 这个是个核心组件,其各种限流,熔断等功能都是通过此对象来实现的。内部是一个个的Slot对象,每个Slot对象完成各自对应的功能,其Chain的构建是通过Spi的方式来构建的。
# Sentinel default ProcessorSlots
//为每个ProcessorSlotChain提供Node对象
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
//为每个ProcessorSlotChain提clusterNode对象和originNode
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
//记录日志
com.alibaba.csp.sentinel.slots.logger.LogSlot
//统计流量
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
//系统规则校验保护
com.alibaba.csp.sentinel.slots.system.SystemSlot
//认证校验
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
//流控检查
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
//熔断降级检测
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
除了上面系统默认提供的8个Slot对象外,开发者还可以自己定义相关的Slot对象,按照SPI的方式加入即可。
Node: 用于完成统计相关的功能,ProcessorSlotChain中的第一个Slot对象NodeSelectorSlot就是用于创建或获取Node对象,后续的每个Slot都会透传此Node对象,用于统计相关功能。
核心源码分析
有了上面几个比较核心的概念后,下面正式进入源码的分析。
1. 入口创建 SphU
此类提供了创建Entry的api,类似的类还有一个叫做SphO,两个类的区别在于前者是通过抛出异常的方式来拒绝请求通过,而后者则是通过返回Bool类型的结果来表示结果。
SphU
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
SphO
public static boolean entry(String name) {
return entry(name, EntryType.OUT, 1, OBJECTS0);
}
后面我们已SphU为入口来分析,在SphU的entry方法中调用了Env类:
public class Env {
//创建CtSph对象,用于统计和规则校验
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
//初始化InitFunc相关接口,通过SPI定义
InitExecutor.doInit();
}
}
进入CtSph的entry方法,此类中entry方法有多个重载,我们只分析一个即可,其他的都是一样的
@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}
2.创建conext和processorSlotChain对象
在上面的方法中我们可以看出,每个entry都会关联一个资源,资源通过name和type来唯一关联。接着代码往下走,最后会进入一个entryWithPriority的方法,此方法是一个很重要的方法,在方法类会创建上下文对象Context,处理链对象ProcessorSlotChain,也是规则校验的入口
/**
* resourceWrapper:资源
* count:请求许可
* prioritized:优先级
* args:额外携带的参数
*/
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
//获取上下文对象,一个ThreadLocal变量
Context context = ContextUtil.getContext();
//如果创建的上下文数量达到上限(2000),会返回一个NullContext对象
if (context instanceof NullContext) {
//创建一个不执行规则校验的entry对象
return new CtEntry(resourceWrapper, null, context);
}
//第一次进入会走到走到这里
if (context == null) {
//创建上下文对象,并设置默认名称为:sentinel_default_context
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
//全局控制开关,如果关闭则不执行规则校验
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
//创建或获取ProcessorSlotChain对象,用于执行规则校验
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
//如果走到这里,意味着创建了太多的资源,默认不能超过6000
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
//走到这里,意味着上下文对象创建和ProcessorSlotChain对象创建都ok
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//进入规则校验,如果不通过则会抛出异常
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
//限流触发的异常
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
上面的代码实际上就包含了整个流控规则的校验流程。下面来看看上下文对象Context的创建
1.ContextUtil.getContext()
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
......
public static Context getContext() {
return contextHolder.get();
}
可以看到context是一个线程本地变量,第一次进入的时候返回空,在ContextUtil类的trueEnter方法中会创建新的context对象
protected static Context trueEnter(String name, String origin) {
Context context = contextHolder.get();
if (context == null) {
//获取上下文node缓存,key为context的name
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
//获取上下文入口EntranceNode
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
LOCK.lock();
try {
//二次校验
node = contextNameNodeMap.get(name);
if (node == null) {
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
//创建入口EntranceNode,每个上下文都有唯一的一个入口
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// 将创建的entranceNode挂载到根node下
Constants.ROOT.addChild(node);
//将新创建的node加入到缓冲中
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
//创建上下文
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
return context;
}
上述代码就是context的创建的全过程,每个context都是线程本地变量,并且都会关联一个EntranceNode,并将其挂载根node节点下面。context对象创建完毕后就会创建ProcessorSlotChain对象,我们回到上面entryWithPriority方法中的lookProcessChain方法
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//先从缓存中查询获取对象
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
//通过SPI的方式创建对象
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
ProcessorSlotChain对象的创建是通过spi的方式,其接口实现类定义在如下文件中,将文件中的对象按照约定顺序组织起来就形成了ProcessorSlotChain对象,具体就不深入进去了。当相关的对象都创建好了以后就是具体的规则校验了,回到entryWithPriority方法中的chain.entry(context, resourceWrapper, null, count, prioritized, args)这行代码来,此方法就是规则校验的入口。进入处理链的一个对象是NodeSelectorSlot,来看看源码
@SpiOrder(-10000)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
//DefaultNode缓存,key为context的名称
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
//创建node
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
这段代码还是比较简单,根据上下文的名字获取DefalutNode,如果没有则创建。这里先梳理一下context、entry、ProcesssorSlotChain、和DefaultNode的关系。
- context代表的是一个上下文,每个entry都是在某个具体的context下运行的,同时每个context都会有一个EntranceNode;
- 每个entry都关联一个具体resource;
- 每个resource都会有一个ProcesssorSlotChain来做规则校验;
- 每个ProcessSlotChain可以包含多个DefaultNode,但只会有一个clusterNode;
node的关系图如下(来自官网): EntranceNode1、EntranceNode2分别代表两个上下文环境的入口node。处理链中第二个对象是ClusterBuilderSlot,此对象的作用维护ClusterNode和originNode,clusterNode的作用是针对resource的所有上下文来统计的,originNode的作用是针对具有origin属性的entry来说的,来看看主要代码
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
//创建clusterNode
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
//node为当前上下文的entranceNode
node.setClusterNode(clusterNode);
//如果上下文设置了origin属性
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
接下来的处理器就是LogSlot,此类是功能主要就是记录异常的日志,这里就不细说了。下一个处理器StatisticSlot是非常重要的一个对象,它维护着流量、线程、异常等统计信息。代码如下:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 后续流控、熔断降级等校验
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 如果校验通过会走到这里,增加线程和pass的计数统计
node.increaseThreadNum();
node.addPassRequest(count);
//如果设置了origin,增加originNode统计
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
//增加全局计数统计
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// 调用回调
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// 增加block计数统计
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
接下来的SystemSlot和AuthoritySlot这里就不做介绍了。重点说下FlowSlot这个类,整个流控的校验都是在这里面进行的。代码比较简单:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
进入checkFlow方法,最终会调用FlowRuleChecker的checkFlow方法:
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//获取流控规则,从这里可以看出,如果设置了多个规则,会逐一校验每一个规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
进入canPassCheck方法,
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
//如果是集群模式,进行集群模式校验
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
//本地校验
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
//选取合适的node,根据其统计技术来判断是否通过
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
最后会调用流量整形控制器的canPass方法,这里看下默认的流量整形控制器DefaultController的实现
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//获取已使用的许可
int curCount = avgUsedTokens(node);
//如果 当前已使用许可 + 请求许可 > 设置的数量
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
//采用滑动窗口来统计,每个窗口分割成了多个小的窗体,通过判断后续窗体的容量来进行流控
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
后续还有DegradeSlot关于熔断降级的,本章就不分析了,留在后续的文章中来进行分析。 上述的过程分析了sentinel限流的原理和主要工作流程。其核心在于ProcessorSlotChain,把握住它就可以抓住其主脉络。