Dubbo源码解析(二十六)远程调用——http协议

348 阅读4分钟

远程调用——http协议

目标:介绍远程调用中跟http协议相关的设计和实现,介绍dubbo-rpc-http的源码。

前言

基于HTTP表单的远程调用协议,采用 Spring 的HttpInvoker实现,关于http协议就不用多说了吧。

源码分析

(一)HttpRemoteInvocation

该类继承了RemoteInvocation类,是在RemoteInvocation上增加了泛化调用的参数设置,以及增加了dubbo本身需要的附加值设置。

public class HttpRemoteInvocation extends RemoteInvocation {

    private static final long serialVersionUID = 1L;
    /**
     * dubbo的附加值名称
     */
    private static final String dubboAttachmentsAttrName = "dubbo.attachments";

    public HttpRemoteInvocation(MethodInvocation methodInvocation) {
        super(methodInvocation);
        // 把附加值加入到会话域的属性里面
        addAttribute(dubboAttachmentsAttrName, new HashMap<String, String>(RpcContext.getContext().getAttachments()));
    }

    @Override
    public Object invoke(Object targetObject) throws NoSuchMethodException, IllegalAccessException,
            InvocationTargetException {
        // 获得上下文
        RpcContext context = RpcContext.getContext();
        // 获得附加值
        context.setAttachments((Map<String, String>) getAttribute(dubboAttachmentsAttrName));

        // 泛化标志
        String generic = (String) getAttribute(Constants.GENERIC_KEY);
        // 如果不为空,则设置泛化标志
        if (StringUtils.isNotEmpty(generic)) {
            context.setAttachment(Constants.GENERIC_KEY, generic);
        }
        try {
            // 调用下一个调用链
            return super.invoke(targetObject);
        } finally {
            context.setAttachments(null);

        }
    }
}

(二)HttpProtocol

该类是http实现的核心,跟我在《dubbo源码解析(二十五)远程调用——hessian协议》中讲到的HessianProtocol实现有很多地方相似。

1.属性

/**
 * 默认的端口号
 */
public static final int DEFAULT_PORT = 80;

/**
 * http服务器集合
 */
private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>();

/**
 * Spring HttpInvokerServiceExporter 集合
 */
private final Map<String, HttpInvokerServiceExporter> skeletonMap = new ConcurrentHashMap<String, HttpInvokerServiceExporter>();

/**
 * HttpBinder对象
 */
private HttpBinder httpBinder;

2.doExport

@Override
protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
    // 获得ip地址
    String addr = getAddr(url);
    // 获得http服务器
    HttpServer server = serverMap.get(addr);
    // 如果服务器为空,则重新创建服务器,并且加入到集合
    if (server == null) {
        server = httpBinder.bind(url, new InternalHandler());
        serverMap.put(addr, server);
    }

    // 获得服务path
    final String path = url.getAbsolutePath();

    // 加入集合
    skeletonMap.put(path, createExporter(impl, type));

    // 通用path
    final String genericPath = path + "/" + Constants.GENERIC_KEY;

    // 添加泛化的服务调用
    skeletonMap.put(genericPath, createExporter(impl, GenericService.class));
    return new Runnable() {
        @Override
        public void run() {
            skeletonMap.remove(path);
            skeletonMap.remove(genericPath);
        }
    };
}

该方法是暴露服务等逻辑,因为dubbo实现http协议采用了Spring 的HttpInvoker实现,所以调用了createExporter方法来创建创建HttpInvokerServiceExporter。

3.createExporter

private <T> HttpInvokerServiceExporter createExporter(T impl, Class<?> type) {
    // 创建HttpInvokerServiceExporter
    final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter();
    // 设置要访问的服务的接口
    httpServiceExporter.setServiceInterface(type);
    // 设置服务实现
    httpServiceExporter.setService(impl);
    try {
        // 在BeanFactory设置了所有提供的bean属性,初始化bean的时候执行,可以针对某个具体的bean进行配
        httpServiceExporter.afterPropertiesSet();
    } catch (Exception e) {
        throw new RpcException(e.getMessage(), e);
    }
    return httpServiceExporter;
}

该方法是创建一个spring 的HttpInvokerServiceExporter。

4.doRefer

@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
    // 获得泛化配置
    final String generic = url.getParameter(Constants.GENERIC_KEY);
    // 是否为泛化调用
    final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);

    // 创建HttpInvokerProxyFactoryBean
    final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean();
    // 设置RemoteInvocation的工厂类
    httpProxyFactoryBean.setRemoteInvocationFactory(new RemoteInvocationFactory() {
        /**
         * 为给定的AOP方法调用创建一个新的RemoteInvocation对象。
         * @param methodInvocation
         * @return
         */
        @Override
        public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) {
            // 新建一个HttpRemoteInvocation
            RemoteInvocation invocation = new HttpRemoteInvocation(methodInvocation);
            // 如果是泛化调用
            if (isGeneric) {
                // 设置标志
                invocation.addAttribute(Constants.GENERIC_KEY, generic);
            }
            return invocation;
        }
    });
    // 获得identity message
    String key = url.toIdentityString();
    // 如果是泛化调用
    if (isGeneric) {
        key = key + "/" + Constants.GENERIC_KEY;
    }
    // 设置服务url
    httpProxyFactoryBean.setServiceUrl(key);
    // 设置服务接口
    httpProxyFactoryBean.setServiceInterface(serviceType);
    // 获得客户端参数
    String client = url.getParameter(Constants.CLIENT_KEY);
    if (client == null || client.length() == 0 || "simple".equals(client)) {
        // 创建SimpleHttpInvokerRequestExecutor连接池 使用的是JDK HttpClient
        SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() {
            @Override
            protected void prepareConnection(HttpURLConnection con,
                                             int contentLength) throws IOException {
                super.prepareConnection(con, contentLength);
                // 设置读取超时时间
                con.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
                // 设置连接超时时间
                con.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
            }
        };
        httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
    } else if ("commons".equals(client)) {
        // 创建 HttpComponentsHttpInvokerRequestExecutor连接池 使用的是Apache HttpClient
        HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor();
        // 设置读取超时时间
        httpInvokerRequestExecutor.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
        // 设置连接超时时间
        httpInvokerRequestExecutor.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
        httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
    } else {
        throw new IllegalStateException("Unsupported http protocol client " + client + ", only supported: simple, commons");
    }
    httpProxyFactoryBean.afterPropertiesSet();
    // 返回HttpInvokerProxyFactoryBean对象
    return (T) httpProxyFactoryBean.getObject();
}

该方法是服务引用的方法,其中根据url配置simple还是commons来选择创建连接池的方式。其中的区别就是SimpleHttpInvokerRequestExecutor使用的是JDK HttpClient,HttpComponentsHttpInvokerRequestExecutor 使用的是Apache HttpClient。

5.getErrorCode

@Override
protected int getErrorCode(Throwable e) {
    if (e instanceof RemoteAccessException) {
        e = e.getCause();
    }
    if (e != null) {
        Class<?> cls = e.getClass();
        if (SocketTimeoutException.class.equals(cls)) {
            // 返回超时异常
            return RpcException.TIMEOUT_EXCEPTION;
        } else if (IOException.class.isAssignableFrom(cls)) {
            // 返回网络异常
            return RpcException.NETWORK_EXCEPTION;
        } else if (ClassNotFoundException.class.isAssignableFrom(cls)) {
            // 返回序列化异常
            return RpcException.SERIALIZATION_EXCEPTION;
        }
    }
    return super.getErrorCode(e);
}

该方法是处理异常情况,设置错误码。

6.InternalHandler

private class InternalHandler implements HttpHandler {

    @Override
    public void handle(HttpServletRequest request, HttpServletResponse response)
            throws IOException, ServletException {
        // 获得请求uri
        String uri = request.getRequestURI();
        // 获得服务暴露者HttpInvokerServiceExporter对象
        HttpInvokerServiceExporter skeleton = skeletonMap.get(uri);
        // 如果不是post,则返回码设置500
        if (!request.getMethod().equalsIgnoreCase("POST")) {
            response.setStatus(500);
        } else {
            // 远程地址放到上下文
            RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
            try {
                // 调用下一个调用
                skeleton.handleRequest(request, response);
            } catch (Throwable e) {
                throw new ServletException(e);
            }
        }
    }

}

该内部类实现了HttpHandler,做了设置远程地址的逻辑。

后记

该部分相关的源码解析地址:github.com/CrazyHZM/in…

该文章讲解了远程调用中关于http协议的部分,内容比较简单,可以参考着官方文档了解一下。接下来我将开始对rpc模块关于injvm本地调用部分进行讲解。