Dubbo之限流分析

4,124 阅读18分钟

微信公众号:房东的小黑黑
路途随遥远,将来更美好
学海无涯,大家一起加油!

在前面的一篇中分析了Dubbo是如何降级的,除了降级,有时限流也是一种很有效的解决高并发的性能问题,那在本篇中开始分析Dubbo是如何限流的。我们知道限流主要是通过控制连接数来实现的,防止某一片段内请求处理过大,导致重要服务的失效。

用法案例

服务端连接控制

限制当前提供者在使用dubbo协议最多接受10个消费者链接

<dubbo:provider protocol="dubbo" accepts="10" />

或者

<dubbo:protocol name="dubbo" accepts="10" />

并发控制
限制com.foo.BarService的每个方法,服务端并发执行(或占用线程池线程数)不能超过10个:

<dubbo:service interface="com.foo.BarService" executes="10" />

限制com.foo.BarServicesayHello方法,服务器并发执行(或占用线程池线程数)不能超过10个。

<dubbo:service interface="com.foo.BarService">
    <dubbo:method name="sayHello" executes="10" />
</dubbo:service>

actives限流

该限流方式与前两种不同,其可以设置在提供端,也可以设置在消费者端。可以设置为接口级别,也可以设置为方法级别。
根据消费者与提供者建立的连接类型,其意义也不同。

长连接: 表示当前的长连接最多可以处理的请求个数。与长连接的数量没有问题。
短连接:表示当前服务可以同时处理的短连接数量。
类级别

<dubbo:service interface="com.foo.BarService" actives="10" />
<dubbo:reference interface="com.foo.BarService" actives="10" />


方法级别

<dubbo:reference interface="com.foo.BarService">
    <dubbo:method name="sayHello" actives="10" />
</dubbo:service>
<dubbo:reference interface="com.foo.BarService">
    <dubbo:method name="sayHello" actives="10" />
</dubbo:service>


connections限流
可以设置在提供端,也可以设置在消费者端。限定连接的个数。对于短连接,和actives相同。但对于长连接,表示长连接的个数。
一般情况下,会使connections与actives联用,让connections限制长连接的个数,让actives限制长连接中可以处理的请求个数。
限制客户端服务使用连接不能超过10个

<dubbo:reference interface="com.foo.BarService" connections="10" />

<dubbo:service interface="com.foo.BarService" connections="10" />

如果<dubbo:service><dubbo:reference>都配置了connections,<dubbo:reference>优先。

延迟连接
延迟连接仅可以设置在消费者端,并且不能设置为方法级别。仅作用于Dubbo服务暴露协议。将长连接的建立推迟到消费者真正调用提供者时。 可以减少长连接的数量。

 <!--设置当前消费者对接口中的每个方法发出链接采用延迟加载-->
   <dubbo:reference id="userService"  lazy="true"
    interface="com.dubbo.service.UserService"/>
<!--设置当前消费者对所有接口中的所有方法发出链接采用延迟加载-->
    <dubbo:consumer lazy="true"></dubbo:consumer>

我们已经讲解了如何设置控制链接数的,那么它们底层是如何实现的呢?

源码分析

实际上上面的逻辑都是一个个Filter,所有的Filter会连接成一个过滤器链,每次请求都会经过整个链路中的每一个Filter。那它是在什么时候构造成一个过滤器链的呢。

在服务暴露的时候会调用buildInvokerChain, 将真正执行的invoker放到过滤链的尾部,再执行protocol.expert(buildInvokerChain(invoker, ...))方法来进行服务暴露。

在服务引用的时候会调用protocol.refer()方法先生成Invoker,再调用buildInvokerChain(protocol.refer(type, url), ...)来生成消费类型的调用链。

ExecuteLimitFilter
它用于限制每个服务中每个方法的最大并发数,有接口级别和方法级别的配置方式。

public class ExecuteLimitFilter extends ListenableFilter {

    private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time";

    public ExecuteLimitFilter() {
        super.listener = new ExecuteLimitListener();
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
        if (!RpcStatus.beginCount(url, methodName, max)) {
            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " +
                    url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
                    "\" /> limited.");
        }

        invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        try {
            return invoker.invoke(invocation);
        } catch (Throwable t) {
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        }
    }

    static class ExecuteLimitListener implements Listener {
        @Override
        public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
            RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true);
        }

        @Override
        public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
            RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false);
        }

        private long getElapsed(Invocation invocation) {
            String beginTime = invocation.getAttachment(EXECUTELIMIT_FILTER_START_TIME);
            return StringUtils.isNotEmpty(beginTime) ? System.currentTimeMillis() - Long.parseLong(beginTime) : 0;
        }
    }
}
public class RpcStatus {
    private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();

    private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
    private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
    private final AtomicInteger active = new AtomicInteger();
    private final AtomicLong total = new AtomicLong();
    private final AtomicInteger failed = new AtomicInteger();
    private final AtomicLong totalElapsed = new AtomicLong();
    private final AtomicLong failedElapsed = new AtomicLong();
    private final AtomicLong maxElapsed = new AtomicLong();
    private final AtomicLong failedMaxElapsed = new AtomicLong();
    private final AtomicLong succeededMaxElapsed = new AtomicLong();

    //......

    public static void beginCount(URL url, String methodName) {
        beginCount(url, methodName, Integer.MAX_VALUE);
    }

    public static boolean beginCount(URL url, String methodName, int max) {
        max = (max <= 0) ? Integer.MAX_VALUE : max;
        RpcStatus appStatus = getStatus(url);
        RpcStatus methodStatus = getStatus(url, methodName);
        if (methodStatus.active.incrementAndGet() > max) {
            methodStatus.active.decrementAndGet();
            return false;
        } else {
            appStatus.active.incrementAndGet();
            return true;
        }
    }

    public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
        endCount(getStatus(url), elapsed, succeeded);
        endCount(getStatus(url, methodName), elapsed, succeeded);
    }

    private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
        status.active.decrementAndGet();
        status.total.incrementAndGet();
        status.totalElapsed.addAndGet(elapsed);
        if (status.maxElapsed.get() < elapsed) {
            status.maxElapsed.set(elapsed);
        }
        if (succeeded) {
            if (status.succeededMaxElapsed.get() < elapsed) {
                status.succeededMaxElapsed.set(elapsed);
            }
        } else {
            status.failed.incrementAndGet();
            status.failedElapsed.addAndGet(elapsed);
            if (status.failedMaxElapsed.get() < elapsed) {
                status.failedMaxElapsed.set(elapsed);
            }
        }
    }

    //......
}

其基本原理:在框架中使用一个ConcurrentMap缓存了并发数的计数器,为每个请求URL生成一个IdentityString,并以此为key;再将每个IdentityString生成一个RpcStatus对象,将此作为value。RpcStatus对象用于记录对应的并发数。在调用开始之前,会通过URL获得RpcStatus对象,把对象中的并发数计数器原子+1,在finally中再将原子减1。只要在计数器+1的时候,发现当前计数器比设置的并发数大时,就会抛出异常。

TpsLimitFilter

public class DefaultTPSLimiter implements TPSLimiter {

    /**
     * 每个Service维护一个计数器
     */

    private final ConcurrentMap<String, StatItem> stats
            = new ConcurrentHashMap<String, StatItem>();

    @Override
    public boolean isAllowable(URL url, Invocation invocation) {
        int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
        long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
                Constants.DEFAULT_TPS_LIMIT_INTERVAL);
        //servicekey并没有和方法绑定,只能限流接口
        String serviceKey = url.getServiceKey();
        if (rate > 0) {
            StatItem statItem = stats.get(serviceKey);
            if (statItem == null) {
                stats.putIfAbsent(serviceKey,
                        new StatItem(serviceKey, rate, interval));
                statItem = stats.get(serviceKey);
            }
            return statItem.isAllowable();
        } else {
            StatItem statItem = stats.get(serviceKey);
            if (statItem != null) {
                stats.remove(serviceKey);
            }
        }

        return true;
    }
}
class StatItem {

    //接口名
    private String name;

    //计数周期开始
    private long lastResetTime;

    //计数间隔
    private long interval;

    //剩余计数请求数
    private AtomicInteger token;

    //总共允许请求数
    private int rate;

    StatItem(String name, int rate, long interval) {
        this.name = name;
        this.rate = rate;
        this.interval = interval;
        this.lastResetTime = System.currentTimeMillis();
        this.token = new AtomicInteger(rate);
    }

    public boolean isAllowable() {
        long now = System.currentTimeMillis();
        if (now > lastResetTime + interval) {
            token.set(rate);
            lastResetTime = now;
        }

        int value = token.get();
        boolean flag = false;
        while (value > 0 && !flag) {
            //乐观锁增加计数
            flag = token.compareAndSet(value, value - 1);
            //失败重新获取
            value = token.get();
        }

        return flag;
    }

    long getLastResetTime() {
        return lastResetTime;
    }

    int getToken() {
        return token.get();
    }

    @Override
    public String toString() {
        return new StringBuilder(32).append("StatItem ")
                .append("[name=").append(name).append(", ")
                .append("rate = ").append(rate).append(", ")
                .append("interval = ").append(interval).append("]")
                .toString();
    }

}

TpsLimitFilter的限流是基于令牌的,即一段时间内只分配N个令牌,每次请求都会消耗一个令牌,耗完为止,后面再来的请求都会被拒绝。
具体的逻辑是在DefaultTPSLimiter#isAllowable,会用这个方法判断是否触发限流。
在DefaultTPSLimiter内部用一个ConcurrentHashMap缓存每个接口的令牌数,key是interface+group+version,value是一个StatItem对象,它包装了令牌刷新时间间隔、每次发放的令牌数等。首先判断当前时间减去上次发放令牌的时间是否超过了时间间隔,超过了就重新发放令牌,之前剩余的令牌会被直接覆盖掉。然后,通过CAS的方式减去1令牌,减掉后小于0就会触发限流。

ActiveLimitFilter
和服务提供者的ExecuteLimitFilter相似,它是消费者端的过滤器,限制的是客户端的并发量。

但是它与ExecuteLimitFilter有所不同,它不会直接抛出异常。而是当到达阈值的时候,会先加锁抢占当前接口的RpcStatus对象,然后通过wait方法进行等待,等待是有时间的,因为请求是有timeout属性的。然后如果某个Invoker在调用结束后,并发把计数器减-1并触发一个notify,此时会有一个在wait状态的线程被唤醒并继续执行,判断现在是否超时,如果超时则抛出异常。如果当前并发数仍然超出阈值,则继续执行wait方法;如果没有超出阈值在,则跳出循环,CAS+1,并调用invoke方法,调用结束后CAS-1,最后通过notify唤醒另外一个线程。

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        // 获取配置的数量
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
        // 获取当前接口调用统计
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (max > 0) {
            //获取接口超时时间
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();// 获取并发数
            if (active >= max) {// 如果大于最大数量
                synchronized (count) {
                    while ((active = count.getActive()) >= max) {
                        try {
                            // 挂起当前线程并释放锁,因为并发数已超过限制
                            count.wait(remain);
                        } catch (InterruptedException e) {
                        }
                        // 通过notify唤醒了,计算挂起的时间
                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                        if (remain <= 0) {// 如果已经超过超时时间
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                                   + invoker.getInterface().getName() + ", method: "
                                                   + invocation.getMethodName() + ", elapsed: " + elapsed
                                                   + ", timeout: " + timeout + ". concurrent invokes: " + active
                                                   + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }
        try {
            long begin = System.currentTimeMillis();
            // 增加该方法的数量
            RpcStatus.beginCount(url, methodName);
            try {
                // 调用
                Result result = invoker.invoke(invocation);
                // 减少数量
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                return result;
            } catch (RuntimeException t) {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                throw t;
            }
        } finally {
            if(max>0){// 调用完成后调用notify唤醒在等待的线程
                synchronized (count) {
                    count.notify();
                } 
            }
        }
    }

参考文章:
Dubbo之限流TpsLimitFilter源码分析
Dubbo服务限流
Dubbo源码分析----过滤器之ActiveLimitFilter