sentinel核心源码分析

2,408 阅读6分钟

sentinel简介

sentinel是alibaba开源的流控组件,以流量为核心,提供系统流量控制,熔断降级,系统保护等功能,保证系统的平稳运行。目前主要的流量控制手段主要有两种,一种是以Hystrix为代表,基于线程池隔离的方式,另一种则是通过信号量的方式,sentinel就是此方式来实现流控的。

使用介绍

本文不会对其详细的使用深入介绍,具体的可以参考 github

简单使用步骤:

1.引入依赖

 <dependency>
     <groupId>com.alibaba.csp</groupId>
     <artifactId>sentinel-core</artifactId>
     <version>1.7.2</version>
 </dependency>

2.在代码中定义资源和规则

资源:在项目里的一段代码,一个方法等一切东西

规则:你要对资源定义的流控规则,如qps等指标

  public static void main(String[] args) {
     initFlowRules();
     while (true) {
         Entry entry = null;
         try {
         entry = SphU.entry("HelloWorld");
             /*您的业务逻辑 - 开始*/
             System.out.println("hello world");
             /*您的业务逻辑 - 结束*/
     } catch (BlockException e1) {
             /*流控逻辑处理 - 开始*/
         System.out.println("block!");
             /*流控逻辑处理 - 结束*/
     } finally {
        if (entry != null) {
            entry.exit();
        }
     }
 }
  
 private static void initFlowRules(){
       List<FlowRule> rules = new ArrayList<>();
       FlowRule rule = new FlowRule();
       rule.setResource("HelloWorld");
       rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
       // Set limit QPS to 20.
       rule.setCount(20);
       rules.add(rule);
       FlowRuleManager.loadRules(rules);
 }
}

上述代码中业务代码被定义名为"HelloWorld"的资源,该资源的qps为20。

原理分析

从上面的代码可以看出,限流的入口是从一个叫Entry的对象开始的,Entry顾名思义可以理解为通行入口的意思。通过入口来把我们的代码即资源保护起来,任何请求到来时都要先经过入口的检查,如果检查通过了才能放行,否则的话拒绝放行。而通行规则则有开发人员自己来定。下图来源于sentinel官方,此图大概描述了sentinel运行的原理图。对于初次接触sentinel的人来说看到此图可能会比较懵逼,这里先简单说下大概的工作流程

当我们有请求到来时,Entry会为每一个资源创建一个处理链ProcessorSlotChain,系统默认提供了8个Handler构成了此处理链,每个handler各司其职完成相应的功能,当然我们的流量校验处理对象也在其中名为FlowSlot,如果请求到来时能够通过ProcessorSlotChain的校验的话,就放行此请求,如果不通过,就会抛出相应的异常。Sentinel是以流量控制为核心,底层的流量统计是以滑动窗口来完成qps统计的,具体实现是通过名为LeapArray的对象来完成的。处理链如下图所示:

核心概念和类

在分析具体的源码之前,先介绍几个比较核心的概念和对象,不然进入代码会比较生涩。

Entry:前面说过了,对于要保护的资源须用Entry包裹起来即:

 Entry entry = SphU.entry("HelloWorld");
  ......//保护的资源
 entry.exit();

Context:上下文,每个entry都是存在特定上下文中,它是一个ThreadLocal变量,其内部通过name字段来区分不同的上下文,一个上下文即代表一个EntranceNode;

ResourceWrapper:资源包装类,上面SphU.entry("HelloWorld")语句实际上创建了一个HelloWorld的ResourceWrapper,资源具有全局唯一性

public abstract class ResourceWrapper {
 	protected final String name; //资源名
    ......
}

ProcessorSlotChain: 这个是个核心组件,其各种限流,熔断等功能都是通过此对象来实现的。内部是一个个的Slot对象,每个Slot对象完成各自对应的功能,其Chain的构建是通过Spi的方式来构建的。

# Sentinel default ProcessorSlots

//为每个ProcessorSlotChain提供Node对象
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot

//为每个ProcessorSlotChain提clusterNode对象和originNode
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot

//记录日志
com.alibaba.csp.sentinel.slots.logger.LogSlot

//统计流量
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot

//系统规则校验保护
com.alibaba.csp.sentinel.slots.system.SystemSlot

//认证校验
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot

//流控检查
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot

//熔断降级检测
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

除了上面系统默认提供的8个Slot对象外,开发者还可以自己定义相关的Slot对象,按照SPI的方式加入即可。

Node: 用于完成统计相关的功能,ProcessorSlotChain中的第一个Slot对象NodeSelectorSlot就是用于创建或获取Node对象,后续的每个Slot都会透传此Node对象,用于统计相关功能。

核心源码分析

有了上面几个比较核心的概念后,下面正式进入源码的分析。

1. 入口创建 SphU

此类提供了创建Entry的api,类似的类还有一个叫做SphO,两个类的区别在于前者是通过抛出异常的方式来拒绝请求通过,而后者则是通过返回Bool类型的结果来表示结果。

SphU

  public static Entry entry(String name) throws BlockException {
      return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
  }

SphO

  public static boolean entry(String name) {
      return entry(name, EntryType.OUT, 1, OBJECTS0);
  }

后面我们已SphU为入口来分析,在SphU的entry方法中调用了Env类:

public class Env {
	//创建CtSph对象,用于统计和规则校验
    public static final Sph sph = new CtSph();

    static {
        // If init fails, the process will exit.
        //初始化InitFunc相关接口,通过SPI定义
        InitExecutor.doInit();
    }
}

进入CtSph的entry方法,此类中entry方法有多个重载,我们只分析一个即可,其他的都是一样的

    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }

2.创建conext和processorSlotChain对象

在上面的方法中我们可以看出,每个entry都会关联一个资源,资源通过name和type来唯一关联。接着代码往下走,最后会进入一个entryWithPriority的方法,此方法是一个很重要的方法,在方法类会创建上下文对象Context,处理链对象ProcessorSlotChain,也是规则校验的入口

/**
* resourceWrapper:资源
* count:请求许可
* prioritized:优先级
* args:额外携带的参数
*/
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        //获取上下文对象,一个ThreadLocal变量
        Context context = ContextUtil.getContext();
        //如果创建的上下文数量达到上限(2000),会返回一个NullContext对象
        if (context instanceof NullContext) {
			//创建一个不执行规则校验的entry对象
            return new CtEntry(resourceWrapper, null, context);
        }
		//第一次进入会走到走到这里
        if (context == null) {
            //创建上下文对象,并设置默认名称为:sentinel_default_context
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }
		//全局控制开关,如果关闭则不执行规则校验
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }
		//创建或获取ProcessorSlotChain对象,用于执行规则校验
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

		//如果走到这里,意味着创建了太多的资源,默认不能超过6000
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
		//走到这里,意味着上下文对象创建和ProcessorSlotChain对象创建都ok
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
        	//进入规则校验,如果不通过则会抛出异常
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
        	//限流触发的异常
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

上面的代码实际上就包含了整个流控规则的校验流程。下面来看看上下文对象Context的创建

1.ContextUtil.getContext()

	private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
    ......
    public static Context getContext() {
        return contextHolder.get();
    }

可以看到context是一个线程本地变量,第一次进入的时候返回空,在ContextUtil类的trueEnter方法中会创建新的context对象

protected static Context trueEnter(String name, String origin) {
        Context context = contextHolder.get();
        if (context == null) {
        	//获取上下文node缓存,key为context的name
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            //获取上下文入口EntranceNode
            DefaultNode node = localCacheNameMap.get(name);
            if (node == null) {
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                    setNullContext();
                    return NULL_CONTEXT;
                } else {
                    LOCK.lock();
                    try {
                    	//二次校验
                        node = contextNameNodeMap.get(name);
                        if (node == null) {
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {
                            	//创建入口EntranceNode,每个上下文都有唯一的一个入口
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // 将创建的entranceNode挂载到根node下
                                Constants.ROOT.addChild(node);
								//将新创建的node加入到缓冲中
                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        LOCK.unlock();
                    }
                }
            }
            //创建上下文
            context = new Context(node, name);
            context.setOrigin(origin);
            contextHolder.set(context);
        }

        return context;
    }

上述代码就是context的创建的全过程,每个context都是线程本地变量,并且都会关联一个EntranceNode,并将其挂载根node节点下面。context对象创建完毕后就会创建ProcessorSlotChain对象,我们回到上面entryWithPriority方法中的lookProcessChain方法

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
		//先从缓存中查询获取对象
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
					//通过SPI的方式创建对象
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }

ProcessorSlotChain对象的创建是通过spi的方式,其接口实现类定义在如下文件中,将文件中的对象按照约定顺序组织起来就形成了ProcessorSlotChain对象,具体就不深入进去了。当相关的对象都创建好了以后就是具体的规则校验了,回到entryWithPriority方法中的chain.entry(context, resourceWrapper, null, count, prioritized, args)这行代码来,此方法就是规则校验的入口。进入处理链的一个对象是NodeSelectorSlot,来看看源码

@SpiOrder(-10000)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

   	//DefaultNode缓存,key为context的名称
    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
       
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                   	//创建node
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }

            }
        }
        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

这段代码还是比较简单,根据上下文的名字获取DefalutNode,如果没有则创建。这里先梳理一下context、entry、ProcesssorSlotChain、和DefaultNode的关系。

  • context代表的是一个上下文,每个entry都是在某个具体的context下运行的,同时每个context都会有一个EntranceNode;
  • 每个entry都关联一个具体resource;
  • 每个resource都会有一个ProcesssorSlotChain来做规则校验;
  • 每个ProcessSlotChain可以包含多个DefaultNode,但只会有一个clusterNode;

node的关系图如下(来自官网): EntranceNode1、EntranceNode2分别代表两个上下文环境的入口node。处理链中第二个对象是ClusterBuilderSlot,此对象的作用维护ClusterNode和originNode,clusterNode的作用是针对resource的所有上下文来统计的,originNode的作用是针对具有origin属性的entry来说的,来看看主要代码

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    //创建clusterNode
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }
       	//node为当前上下文的entranceNode
        node.setClusterNode(clusterNode);

      	//如果上下文设置了origin属性
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

接下来的处理器就是LogSlot,此类是功能主要就是记录异常的日志,这里就不细说了。下一个处理器StatisticSlot是非常重要的一个对象,它维护着流量、线程、异常等统计信息。代码如下:

@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // 后续流控、熔断降级等校验
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // 如果校验通过会走到这里,增加线程和pass的计数统计
            node.increaseThreadNum();
            node.addPassRequest(count);
            //如果设置了origin,增加originNode统计
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }
			//增加全局计数统计
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // 调用回调
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            // 增加block计数统计
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }
    }

接下来的SystemSlot和AuthoritySlot这里就不做介绍了。重点说下FlowSlot这个类,整个流控的校验都是在这里面进行的。代码比较简单:

  @Override
  public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                    boolean prioritized, Object... args) throws Throwable {
      checkFlow(resourceWrapper, context, node, count, prioritized);

      fireEntry(context, resourceWrapper, node, count, prioritized, args);
  }

进入checkFlow方法,最终会调用FlowRuleChecker的checkFlow方法:

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        //获取流控规则,从这里可以看出,如果设置了多个规则,会逐一校验每一个规则
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

进入canPassCheck方法,

    public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                    boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        }
        //如果是集群模式,进行集群模式校验
        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }
		//本地校验
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }
    
     private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        //选取合适的node,根据其统计技术来判断是否通过
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }

        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }

最后会调用流量整形控制器的canPass方法,这里看下默认的流量整形控制器DefaultController的实现

  @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    	//获取已使用的许可
        int curCount = avgUsedTokens(node);
        //如果 当前已使用许可 + 请求许可  > 设置的数量
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                //采用滑动窗口来统计,每个窗口分割成了多个小的窗体,通过判断后续窗体的容量来进行流控
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

后续还有DegradeSlot关于熔断降级的,本章就不分析了,留在后续的文章中来进行分析。 上述的过程分析了sentinel限流的原理和主要工作流程。其核心在于ProcessorSlotChain,把握住它就可以抓住其主脉络。