dubbo 自适应负载均衡改造 天池中间件初赛

671 阅读4分钟

首先感谢这次第六届天池中间件大赛 dubbo的自适应负载均衡改造给予我的提升,也同样感谢让我了解dubbo改造思路的桐人前辈,适逢中秋节dubbo发来冬季外套,也写入自己的经历回馈下社区。github: github.com/complone/du…

核心算法:根据每个服务被访问的请求数计算权重,然后根据服务端传过来的信息重新分配权重

网关层(消费者端)

localSuitTableLoadBalance.java

package com.zoe.platform.util;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @Author chengxingyuan
 * @Description 根据请求分配权重的自适应负载均衡
 * @Date 16:38 2019/9/4
 **/
public class LocalSuitTableLoadBalance extends RandomLoadBalance {
    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        //1、所有服务器争取每个打一次请求
        Invoker invoker = doSelectInFreeInvokers(invokers);

        //2、根据服务端信息分配权重
        invoker = invoker != null ? invoker : doSelectWithWeigth(invokers);

        return invoker;
    }

    /**
     * 落实优先每个机器都有流量请求
     */
    private <T> Invoker<T> doSelectInFreeInvokers(List<Invoker<T>> invokers) {

        if (CustomerInfoManager.LOAD_INFO.size() < invokers.size()) {
            for (Invoker invoker : invokers) {

                CustomerInfo customerInfo = CustomerInfoManager.getServerLoadInfo(invoker);

                if (customerInfo != null) break;

                return invoker;
            }
        }

        return null;
    }

    /**
     * 根据服务端配置和平均耗时计算权重
     */
    private <T> Invoker<T> doSelectWithWeigth(List<Invoker<T>> invokers) {

        // 重新分配权重的<服务,权重>映射
        int[] serviceWeight = new int[invokers.size()];

        // 总权重
        int totalWeight = 0;

        // 1、计算总权重
        for (int index = 0, size = invokers.size(); index < size; ++index) {

            Invoker<T> invoker = invokers.get(index);

            CustomerInfo customerInfo = CustomerInfoManager.getServerLoadInfo(invoker);
            AtomicInteger availThreadAtomic = CustomerInfoManager.getAvailThread(invoker);

            if (customerInfo != null) {

                if (availThreadAtomic.get() > 0) {
                    int weight = customerInfo.getServerWeight();
                    //根据耗时重新计算权重(基本权重*(1秒/单个请求耗时))
                    int clientTimeAvgSpendCurr = customerInfo.getAvgSpendTime();
                    if (clientTimeAvgSpendCurr == 0) {
                        // 耗时为0,性能优,请求直接打到该机器
                        // 也有可能是性能差,采用随机
                        return invokers.get(ThreadLocalRandom.current().nextInt(invokers.size()));
                    }

                    // 计算权重
                    weight = weight * (500 / clientTimeAvgSpendCurr);
                    serviceWeight[index] = weight;
                    totalWeight = totalWeight + weight;
                }
            }
        }

        // 2、按照新的权重选择服务,权重加权随机算法
        int oneWeight = ThreadLocalRandom.current().nextInt(totalWeight);

        for (int i = 0, size = invokers.size(); i < size; ++i) {
            oneWeight -= serviceWeight[i];
            if (oneWeight < 0) {
                return invokers.get(i);
            }
        }

        //兜底采用随机算法
        return invokers.get(ThreadLocalRandom.current().nextInt(invokers.size()));
    }
}

客户机信息的更新以及获取

package com.aliware.tianchi;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.dubbo.rpc.Invoker;

import com.aliware.tianchi.comm.CustomerInfo;

/**
 * @author complone
 */
public class CustomerInfoManager {

    public static final Map<String, CustomerInfo> LOAD_INFO = new ConcurrentHashMap<>();
    private static final Map<String, AtomicInteger> AVAIL_MAP = new ConcurrentHashMap<>();

    public static CustomerInfo getServerLoadInfo(Invoker<?> invoker) {

        String host = invoker.getUrl().getHost();
        CustomerInfo customerInfo = LOAD_INFO.get(host);
        return customerInfo;
    }

    public static void updateInfo(String notiftStr) {

        String[] severLoadArr = notiftStr.split(",");

        String quota = severLoadArr[0];
        int providerThread = Integer.valueOf(severLoadArr[1]);
        int activeCount = Integer.valueOf(severLoadArr[2]);
        int avgTime = Integer.valueOf(severLoadArr[3]);
        String key = "provider-" + quota;
        CustomerInfo customerInfo = LOAD_INFO.get(key);
        if (customerInfo == null) {
            customerInfo = new CustomerInfo(quota, providerThread);
            LOAD_INFO.put(key, customerInfo);
        }
        customerInfo.getAllActiveCount().set(activeCount);
        int availCount = customerInfo.getProviderAllThreads() - activeCount;
        customerInfo.setAvgSpendTime(avgTime);

        AtomicInteger avail = AVAIL_MAP.get(key);
        if (avail == null) {
            avail = new AtomicInteger(availCount);
            AVAIL_MAP.put(key, avail);
        }
    }

    public static AtomicInteger getAvailThread(Invoker<?> invoker) {
        String host = invoker.getUrl().getHost();
        return AVAIL_MAP.get(host);
    }

}

在返回回调函数之前,设置过滤器的优先级,获取到消费者接受到客户机可用的线程数

package com.aliware.tianchi;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;

/**
 * @author complone
 */
@Activate(group = Constants.CONSUMER)
public class TestClientFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        AtomicInteger availThread = CustomerInfoManager.getAvailThread(invoker);
        if (availThread == null) {
            return invoker.invoke(invocation);
        }
        availThread.decrementAndGet();
        Result result = invoker.invoke(invocation);
        if (result instanceof AsyncRpcResult) {
            AsyncRpcResult asyncResult = (AsyncRpcResult) result;
            asyncResult.getResultFuture().whenComplete((actual, t) -> {
                availThread.incrementAndGet();
            });
        }
        return result;
    }

    @Override
    public Result onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
        return result;
    }
}

消费者回调线程监听函数

public class CallbackListenerImpl implements CallbackListener {

    @Override
    public void receiveServerMsg(String msg) {
        CustomerInfoManager.updateInfo(msg);
    }

}

提供者端

回调客户机信息

package com.aliware.tianchi;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.config.ProtocolConfig;
import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.rpc.listener.CallbackListener;
import org.apache.dubbo.rpc.service.CallbackService;

import com.aliware.tianchi.comm.CustomerInfo;

import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author complone
 */
public class CallbackServiceImpl implements CallbackService {

    private String getInfo() {
        CustomerInfo customerInfo = ProviderManager.getServerInfo();
        Optional<ProtocolConfig> protocolConfig = ConfigManager.getInstance().getProtocol(Constants.DUBBO_PROTOCOL);
        String env = System.getProperty("quota");
        int providerThread = protocolConfig.get().getThreads();
        long allSpendTimeTotal = customerInfo.getAllSpendTimeTotal().get();
        long allReqCount = customerInfo.getAllReqCount().get();
        long allAvgTime = 0;
        long allActiveCount = customerInfo.getAllActiveCount().get();
        if (allReqCount != 0) {
            allAvgTime = allSpendTimeTotal / allReqCount;
        }
        StringBuilder info = new StringBuilder();
        info.append(env).append(",").append(providerThread).append(",").append(allActiveCount).append(",").append(allAvgTime).append(",").append(allReqCount);

        return info.toString();
    }

    public CallbackServiceImpl() {
        timer.schedule(new TimerTask() {
            @Override
            public void run() {

                if (!listeners.isEmpty()) {
                    for (Map.Entry<String, CallbackListener> entry : listeners.entrySet()) {
                        try {
                            entry.getValue().receiveServerMsg(getInfo());
                        } catch (Throwable t1) {
                            listeners.remove(entry.getKey());
                        }
                    }
                    ProviderManager.resetTime();
                }
            }
        }, 0, 1000);
    }

    private Timer timer = new Timer();

    /**
     * key: listener type
     * value: callback listener
     */
    private final Map<String, CallbackListener> listeners = new ConcurrentHashMap<>();

    @Override
    public void addListener(String key, CallbackListener listener) {
        listeners.put(key, listener);
        listener.receiveServerMsg(getInfo());
    }

}

获取客户机当前信息,统计请求的平均时间

package com.aliware.tianchi;

import com.aliware.tianchi.comm.CustomerInfo;

/**
* @Author complone
*/
public class ProviderManager {

    private static final CustomerInfo SERVER_INFO = new CustomerInfo();


    public static CustomerInfo getServerInfo() {

        return SERVER_INFO;
    }

    public static void endTime(long expend, boolean succeeded) {
        SERVER_INFO.getAllActiveCount().decrementAndGet();
        SERVER_INFO.getAllReqCount().incrementAndGet();
        SERVER_INFO.getAllSpendTimeTotal().addAndGet(expend);
    }

    public static void resetTime() {
        SERVER_INFO.getAllSpendTimeTotal().set(0L);
        SERVER_INFO.getAllReqCount().set(0L);
    }

    public static void startTime() {
        SERVER_INFO.getAllActiveCount().incrementAndGet();
    }
}

访问提供者的请求限流,因为线程池任务队列满之后会触发拒绝请求,所以让该线程自旋,然后抛出rpc异常

package com.aliware.tianchi;

import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.transport.RequestLimiter;

/**@author complone
 *
 * 服务端限流
 * 可选接口
 * 在提交给后端线程池之前的扩展,可以用于服务端控制拒绝请求
 */
public class TestRequestLimiter implements RequestLimiter {

    /**
     * @param request 服务请求
     * @param activeTaskCount 服务端对应线程池的活跃线程数
     * @return  false 不提交给服务端业务线程池直接返回,客户端可以在 Filter 中捕获 RpcException
     *          true 不限流
     */
    @Override
    public boolean tryAcquire(Request request, int activeTaskCount) {
        return true;
    }

}

计算每一次从消费者到提供者的访问时间,即TPS

package com.aliware.tianchi;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;

/**
* @Author complone
*/

@Activate(group = Constants.PROVIDER)
public class TestServerFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        boolean isSuccess = false;
        long begin = System.currentTimeMillis();
        try {
            ProviderManager.startTime();
            Result result = invoker.invoke(invocation);
            isSuccess = true;
            return result;
        } catch (Exception e) {
            throw e;
        } finally {
            ProviderManager.endTime(System.currentTimeMillis() - begin, isSuccess);
        }

    }

    @Override
    public Result onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
        return result;
    }

}