阅读 89

手写一个RPC框架

RPC是什么

RPC(Remote Procedure Call Protocol)远程过程调用协议,一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP和UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

与本地调用的区别

远程调用就像异地恋。本地调用就如在身边。

远程调用中需要通过网络

响应要慢几个数量级。不那么可靠。

RPC C/S模式

RPC的三个过程

  1. 通信协议

  2. 寻址

  3. 序列化(将对象转换成网络可以传输的二进制)

为什么要用RPC

业务驱动使用。

  1. 微服务

  2. 分布式系统架构

  3. 服务可重用

  4. 系统间相互调用

RPC与其他协议的区别

  1. RMI远程方法调用是OOP领域中RPC的一种具体实现
  2. 我们熟悉的webservice、restfull接口调用是RPC吗?
  3. 都是RPC,只是消息的组织方式、消息协议不同罢了。

RPC特性和使用场景

与MQ作对比

在架构上,RPC和MQ的差异是,MQ是有Queue,可以把消息存储

RPC的特性:同步调用,对于要等待返回的场景,RPC可以自然使用(也可以异步调用)同步会有阻塞等待,线程消耗。异步不能请求暂存,压力会传到服务Provider.

MQ的特性

  1. MQ有Queue中转消息、可以持久化、可以做集群。

RPC协议是什么

RPC调用过程中需要将参数编组为消息进行发送,接收方需要解组消息参数。消息由哪些部分构成及消息的形式就构成消息协议。RPC调用过程中采用的消息协议称为RPC协议。

RPC协议规定请求、响应消息的格式。在TCP之上我们可以选用或自定义消息协议来完成我们的RPC消息交互。可以选用通用的标准协议,如:http、https。

RPC框架

传统的webservice框架:Apache CXF、Apache Axis2、JAX-ws大于基于SOAP协议。

新兴的微服务框架:Dubbo、springcloud、Apache Thrift、ICE、GRPC。

RPC框架的核心

  1. 服务暴露
  2. 远程对象调用(使用动态代理JDK或字节码的)
  3. 通信(协议、消息ID、IO方法、多连接、心跳)
  4. 序列化(序列化方式、元信息、编码内容)

手写RPC

用户使用RPC框架开发过程时需要做什么?

  1. 定义过程定义接口
  2. 服务端实现过程
  3. 客户端使用生成的stub代理对象

上干货

客户端生成过程接口的代理对象,用JDK动态代理生成接口代理对象。

定义的接口

package edu.dongnao.dnrpc.simple.example;
/**
 * StudentService
 * 
 */
public interface StudentService {
	/**
	   *   获取信息
	 * @return
	 */
	public Student getInfo();
	
	public boolean printInfo(Student student);
}


复制代码

接口的实现

package edu.dongnao.dnrpc.simple.example;

import edu.dongnao.dnrpc.simple.Service;

/**
 * StudentServiceImpl
 * 
 */
@Service(StudentService.class)
public class StudentServiceImpl implements StudentService {

	public Student getInfo() {
		Student person = new Student();
		person.setAge(18);
		person.setName("arrylist");
		person.setSex("女");
		return person;
	}

	public boolean printInfo(Student person) {
		if (person != null) {
			System.out.println(person);
			return true;
		}
		return false;
	}
	
	public static void main(String[] args) {
		new Thread(()->{
			System.out.println("111");
		}).start();;
	}
}

复制代码

对象 实现序列化

package edu.dongnao.dnrpc.simple.example;

import java.io.Serializable;

/**
 * Student
 * 
 */
public class Student implements Serializable{
	
	private static final long serialVersionUID = 1L;
	
	private String name;
	private int age;
	private String sex;
	
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getAge() {
		return age;
	}
	public void setAge(int age) {
		this.age = age;
	}
	public String getSex() {
		return sex;
	}
	public void setSex(String sex) {
		this.sex = sex;
	}
	
	@Override
	public String toString() {
		return "Student [name=" + name + ", age=" + age + ", sex=" + sex + "]";
	}
}


复制代码

编码

package edu.dongnao.dnrpc.simple;

import java.io.Serializable;

/**
 * RpcRequest
 * Rpc请求对象,请求远端服务服务的内容,在网络上进行传输。
 */
public class RpcRequest implements Serializable{
	private static final long serialVersionUID = 1L;
	// 需要请求的类名
	private String className;
	
	// 需求请求的方法名
    private String methodName;
    
    // 请求方法的参数类型
    private Class<?>[] paramTypes;
    
    // 请求的参数值
    private Object[] params;
	
	public String getClassName() {
		return className;
	}
	public void setClassName(String className) {
		this.className = className;
	}
	public String getMethodName() {
		return methodName;
	}
	public void setMethodName(String methodName) {
		this.methodName = methodName;
	}
	public Class<?>[] getParamTypes() {
		return paramTypes;
	}
	public void setParamTypes(Class<?>[] paramTypes) {
		this.paramTypes = paramTypes;
	}
	public Object[] getParams() {
		return params;
	}
	public void setParams(Object[] params) {
		this.params = params;
	}
 
}


复制代码

解码

package edu.dongnao.dnrpc.simple;

import java.io.Serializable;

/**
 * RpcResponse
 * Rpc服务端响应结果包装类,在网络上进行传输。
 */
public class RpcResponse implements Serializable {
	private static final long serialVersionUID = 1L;
	// 可能抛出的异常
	private Throwable error;
	// 响应的内容或结果
    private Object result;
    
	public Throwable getError() {
		return error;
	}
	public void setError(Throwable error) {
		this.error = error;
	}
	public Object getResult() {
		return result;
	}
	public void setResult(Object result) {
		this.result = result;
	}
    
    
}


复制代码

客户端请求

package edu.dongnao.dnrpc.simple;

import java.io.IOException;
import java.io.InvalidClassException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;

/**
 * RpcClient
 * Rpc客户端,代表业务代码作为客户端,往远端服务发起请求。
 */
public class RpcClient {
	
	/**
	 * 通过网络IO,打开远端服务连接,将请求数据写入网络中,并获得响应结果。
	 * 
	 * @param request 将要发送的请求数据
	 * @param host 远端服务域名或者ip地址
	 * @param port 远端服务端口号
	 * @return 服务端响应结果
	 * @throws Throwable 抛出的异常
	 */
	public Object start(RpcRequest request, String host, int port) throws Throwable{
		// 打开远端服务连接
		Socket server = new Socket(host, port);
		
		ObjectInputStream oin = null;
		ObjectOutputStream oout = null;
		
		try {
			// 1. 服务端输出流,写入请求数据,发送请求数据
			oout = new ObjectOutputStream(server.getOutputStream());
			oout.writeObject(request);
			oout.flush();
			
			// 2. 服务端输入流,获取返回数据,转换参数类型
			// 类似于反序列化的过程
			oin = new ObjectInputStream(server.getInputStream());
			Object res = oin.readObject();
			RpcResponse response = null;
			if(!(res instanceof RpcResponse)){
				throw new InvalidClassException("返回参数不正确,应当为:"+RpcResponse.class+" 类型");
			}else{
				response = (RpcResponse) res;
			}
			
			// 3. 返回服务端响应结果
			if(response.getError() != null){ // 服务器产生异常
				throw response.getError();
			}
			return response.getResult();
		}finally{
			try {	// 清理资源,关闭流
				if(oin != null) oin.close();
				if(oout != null) oout.close();
				if(server != null) server.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
 
}


复制代码

客户端代理

package edu.dongnao.dnrpc.simple;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * RpcClientProxy
 * 客户端代理服务,客户端往服务端发起的调用将通过客户端代理来发起
 */
public class RpcClientProxy implements InvocationHandler{
	private String host;	// 服务端地址
	private int port;		// 服务端口号
	
	public RpcClientProxy(String host, int port){
		this.host = host;
		this.port = port;
	}
	
	/**
	 * 生成业务接口的代理对象,代理对象做的事情,在invoke方法中。
	 * @param clazz 代理类型(接口)
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public <T> T getProxy(Class<T> clazz){
		// clazz 不是接口不能使用JDK动态代理
		return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{ clazz }, RpcClientProxy.this);
	}
	
	/**
	 * 动态代理做的事情,接口的实现不在本地,在网络中的其他进程中,我们通过实现了Rpc客户端的对象来发起远程服务的调用。
	 */
	public Object invoke(Object obj, Method method, Object[] params) throws Throwable {
		
		// 调用前
		System.out.println("执行远程方法前,可以做些事情");
		
		// 封装参数,类似于序列化的过程
		RpcRequest request = new RpcRequest();
		request.setClassName(method.getDeclaringClass().getName());
		request.setMethodName(method.getName());
		request.setParamTypes(method.getParameterTypes());
		request.setParams(params);
		// 链接服务器调用服务
		RpcClient client = new RpcClient();
		Object rst = client.start(request, host, port);
		
		// 调用后
		System.out.println("执行远程方法后,也可以做些事情");
		return rst;
	}
}


复制代码

服务提供者

package edu.dongnao.dnrpc.simple;

import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * RpcServer
 * Rpc服务提供者
 */
public class RpcServer {
	
	/**
	 * 启动指定的网络端口号服务,并监听端口上的请求数据。获得请求数据以后将请求信息委派给服务处理器,放入线程池中执行。
	 * @param port 监听端口
	 * @param clazz 服务类所在包名,多个用英文逗号隔开
	 */
	public void start(int port, String clazz) {
		ServerSocket server = null;
		try {
			// 1. 创建服务端指定端口的socket连接
			server = new ServerSocket(port);
			// 2. 获取所有rpc服务类
			Map<String, Object> services = getService(clazz);
			// 3. 创建线程池
			Executor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
			while(true){
				// 4. 获取客户端连接
				Socket client = server.accept();
				// 5. 放入线程池中执行
				RpcServerHandler service = new RpcServerHandler(client, services);
				executor.execute(service);
			}
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			//关闭监听
			if(server != null)
				try {
					server.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
		}
	}
	
	/**
	 * 实例化所有rpc服务类,也可用于暴露服务信息到注册中心。
	 * @param clazz 服务类所在包名,多个用英文逗号隔开
	 * @return
	 */
	public Map<String,Object> getService(String clazz){
		try {
			Map<String, Object> services = new HashMap<String, Object>();
			// 获取所有服务类
			String[] clazzes = clazz.split(",");
			List<Class<?>> classes = new ArrayList<Class<?>>();
			for(String cl : clazzes){
				List<Class<?>> classList = getClasses(cl);
				classes.addAll(classList);
			}
			// 循环实例化
			for(Class<?> cla:classes){
				Object obj = cla.newInstance();
				services.put(cla.getAnnotation(Service.class).value().getName(), obj);
			}
			return services;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	/**
	 * 获取包下所有有@Sercive注解的类
	 * @param pckgname
	 * @return
	 * @throws ClassNotFoundException
	 */
	public static List<Class<?>> getClasses(String pckgname) throws ClassNotFoundException {
		// 需要查找的结果
		List<Class<?>> classes = new ArrayList<Class<?>>();
		// 找到指定的包目录
		File directory = null;
		try {
			ClassLoader cld = Thread.currentThread().getContextClassLoader();
			if (cld == null)
				throw new ClassNotFoundException("无法获取到ClassLoader");
			String path = pckgname.replace('.', '/');
			URL resource = cld.getResource(path);
			if (resource == null)
				throw new ClassNotFoundException("没有这样的资源:" + path);
			directory = new File(resource.getFile());
		} catch (NullPointerException x) {
			throw new ClassNotFoundException(pckgname + " (" + directory + ") 不是一个有效的资源");
		}
		if (directory.exists()) {
			// 获取包目录下的所有文件
			String[] files = directory.list();
			File[] fileList = directory.listFiles();
			// 获取包目录下的所有文件
			for (int i = 0; fileList != null && i < fileList.length; i++) {
				File file = fileList[i];
				//判断是否是Class文件
				if (file.isFile() && file.getName().endsWith(".class")) {
					Class<?> clazz = Class.forName(pckgname + '.' + files[i].substring(0, files[i].length() - 6));
					if(clazz.getAnnotation(Service.class) != null){
						classes.add(clazz);
					}
				}else if(file.isDirectory()){ //如果是目录,递归查找
					List<Class<?>> result = getClasses(pckgname+"."+file.getName());
					if(result != null && result.size() != 0){
						classes.addAll(result);
					}
				}
			}
		} else{
			throw new ClassNotFoundException(pckgname + "不是一个有效的包名");
		}
		return classes;
	}
}

复制代码

服务器请求处理

package edu.dongnao.dnrpc.simple;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.Socket;
import java.util.Map;

/**
 * RpcServerHandler
 * 服务端请求处理,处理来自网络IO的服务请求,并响应结果给网络IO。
 */
public class RpcServerHandler implements Runnable {
	
	// 客户端网络请求socket,可以从中获得网络请求信息
	private Socket clientSocket;
	
	// 服务端提供处理请求的类集合
	private Map<String, Object> serviceMap;
	
	/**
	 * @param client 客户端socket
	 * @param services 所有服务
	 */
	public RpcServerHandler(Socket client, Map<String, Object> services) {
		this.clientSocket = client;
		this.serviceMap = services;
	}
 
 
	/**
	 * 读取网络中客户端请求的信息,找到请求的方法,执行本地方法获得结果,写入网络IO输出中。
	 * 
	 */
	public void run() {
		ObjectInputStream oin = null;
		ObjectOutputStream oout = null;
		RpcResponse response = new RpcResponse();
		try {
			// 1. 获取流以待操作
			oin = new ObjectInputStream(clientSocket.getInputStream());
			oout = new ObjectOutputStream(clientSocket.getOutputStream());
			
			// 2. 从网络IO输入流中请求数据,强转参数类型
			Object param = oin.readObject();
			RpcRequest  request = null;
			if(!(param instanceof RpcRequest)){
				response.setError(new Exception("参数错误"));
				oout.writeObject(response);
				oout.flush();
				return;
			}else{
				// 反序列化RpcRequest
				request = (RpcRequest) param;
			}
			
			// 3. 查找并执行服务方法
			Object service = serviceMap.get(request.getClassName());
			Class<?> clazz= service.getClass();
			Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());
			Object result = method.invoke(service, request.getParams());
			
			// 4. 返回RPC响应,序列化RpcResponse
			response.setResult(result);
			// 序列化结果
			oout.writeObject(response);
			oout.flush();
			return;
		} catch (Exception e) {
			try {	//异常处理
				if(oout != null){
					response.setError(e);
					oout.writeObject(response);
					oout.flush();
				}
			} catch (Exception e1) {
				e1.printStackTrace();
			}
			return;
		}finally{
			try {	// 回收资源,关闭流
				if(oin != null) oin.close();
				if(oout != null) oout.close();
				if(clientSocket != null) clientSocket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
 
}


复制代码

自定义注解

package edu.dongnao.dnrpc.simple;
/**
 * Service
 * 
 */

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * Service
 * 一个提供了RPC服务的实现类。
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Service {
	/**
	 * 注解所属接口类型
	 * @return
	 */
	Class<?> value();
}


复制代码

测试类

服务端先启动

public class ServerTest {
	
	@Test
	public void startServer() {
		RpcServer server = new RpcServer();
		server.start(9998, "edu.dongnao.dnrpc.simple.example");
	}
	
	
}
复制代码

客户端

public class ClientTest {
	
	@Test
	public void test() {
		// 本地没有接口实现,通过代理获得接口实现实例
		RpcClientProxy proxy = new RpcClientProxy("127.0.0.1", 9998);
		StudentService service = proxy.getProxy(StudentService.class);
		
		System.out.println(service.getInfo());
		
		Student student = new Student();
		student.setAge(23);
		student.setName("hashmap");
		student.setSex("男");
		System.out.println(service.printInfo(student));
	}
	
}
复制代码

最简单的实例就是这样了。

关注下面的标签,发现更多相似文章
评论