RPC是什么
RPC(Remote Procedure Call Protocol)远程过程调用协议,一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP和UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
与本地调用的区别
远程调用就像异地恋。本地调用就如在身边。
远程调用中需要通过网络
响应要慢几个数量级。不那么可靠。
RPC C/S模式
RPC的三个过程
-
通信协议
-
寻址
-
序列化(将对象转换成网络可以传输的二进制)
为什么要用RPC
业务驱动使用。
-
微服务
-
分布式系统架构
-
服务可重用
-
系统间相互调用
RPC与其他协议的区别
- RMI远程方法调用是OOP领域中RPC的一种具体实现
- 我们熟悉的webservice、restfull接口调用是RPC吗?
- 都是RPC,只是消息的组织方式、消息协议不同罢了。
RPC特性和使用场景
与MQ作对比
在架构上,RPC和MQ的差异是,MQ是有Queue,可以把消息存储
RPC的特性:同步调用,对于要等待返回的场景,RPC可以自然使用(也可以异步调用)同步会有阻塞等待,线程消耗。异步不能请求暂存,压力会传到服务Provider.
MQ的特性
- MQ有Queue中转消息、可以持久化、可以做集群。
RPC协议是什么
RPC调用过程中需要将参数编组为消息进行发送,接收方需要解组消息参数。消息由哪些部分构成及消息的形式就构成消息协议。RPC调用过程中采用的消息协议称为RPC协议。
RPC协议规定请求、响应消息的格式。在TCP之上我们可以选用或自定义消息协议来完成我们的RPC消息交互。可以选用通用的标准协议,如:http、https。
RPC框架
传统的webservice框架:Apache CXF、Apache Axis2、JAX-ws大于基于SOAP协议。
新兴的微服务框架:Dubbo、springcloud、Apache Thrift、ICE、GRPC。
RPC框架的核心
- 服务暴露
- 远程对象调用(使用动态代理JDK或字节码的)
- 通信(协议、消息ID、IO方法、多连接、心跳)
- 序列化(序列化方式、元信息、编码内容)
手写RPC
用户使用RPC框架开发过程时需要做什么?
- 定义过程定义接口
- 服务端实现过程
- 客户端使用生成的stub代理对象
上干货
客户端生成过程接口的代理对象,用JDK动态代理生成接口代理对象。
定义的接口
package edu.dongnao.dnrpc.simple.example;
/**
* StudentService
*
*/
public interface StudentService {
/**
* 获取信息
* @return
*/
public Student getInfo();
public boolean printInfo(Student student);
}
接口的实现
package edu.dongnao.dnrpc.simple.example;
import edu.dongnao.dnrpc.simple.Service;
/**
* StudentServiceImpl
*
*/
@Service(StudentService.class)
public class StudentServiceImpl implements StudentService {
public Student getInfo() {
Student person = new Student();
person.setAge(18);
person.setName("arrylist");
person.setSex("女");
return person;
}
public boolean printInfo(Student person) {
if (person != null) {
System.out.println(person);
return true;
}
return false;
}
public static void main(String[] args) {
new Thread(()->{
System.out.println("111");
}).start();;
}
}
对象 实现序列化
package edu.dongnao.dnrpc.simple.example;
import java.io.Serializable;
/**
* Student
*
*/
public class Student implements Serializable{
private static final long serialVersionUID = 1L;
private String name;
private int age;
private String sex;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
@Override
public String toString() {
return "Student [name=" + name + ", age=" + age + ", sex=" + sex + "]";
}
}
编码
package edu.dongnao.dnrpc.simple;
import java.io.Serializable;
/**
* RpcRequest
* Rpc请求对象,请求远端服务服务的内容,在网络上进行传输。
*/
public class RpcRequest implements Serializable{
private static final long serialVersionUID = 1L;
// 需要请求的类名
private String className;
// 需求请求的方法名
private String methodName;
// 请求方法的参数类型
private Class<?>[] paramTypes;
// 请求的参数值
private Object[] params;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class<?>[] paramTypes) {
this.paramTypes = paramTypes;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
}
解码
package edu.dongnao.dnrpc.simple;
import java.io.Serializable;
/**
* RpcResponse
* Rpc服务端响应结果包装类,在网络上进行传输。
*/
public class RpcResponse implements Serializable {
private static final long serialVersionUID = 1L;
// 可能抛出的异常
private Throwable error;
// 响应的内容或结果
private Object result;
public Throwable getError() {
return error;
}
public void setError(Throwable error) {
this.error = error;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
客户端请求
package edu.dongnao.dnrpc.simple;
import java.io.IOException;
import java.io.InvalidClassException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* RpcClient
* Rpc客户端,代表业务代码作为客户端,往远端服务发起请求。
*/
public class RpcClient {
/**
* 通过网络IO,打开远端服务连接,将请求数据写入网络中,并获得响应结果。
*
* @param request 将要发送的请求数据
* @param host 远端服务域名或者ip地址
* @param port 远端服务端口号
* @return 服务端响应结果
* @throws Throwable 抛出的异常
*/
public Object start(RpcRequest request, String host, int port) throws Throwable{
// 打开远端服务连接
Socket server = new Socket(host, port);
ObjectInputStream oin = null;
ObjectOutputStream oout = null;
try {
// 1. 服务端输出流,写入请求数据,发送请求数据
oout = new ObjectOutputStream(server.getOutputStream());
oout.writeObject(request);
oout.flush();
// 2. 服务端输入流,获取返回数据,转换参数类型
// 类似于反序列化的过程
oin = new ObjectInputStream(server.getInputStream());
Object res = oin.readObject();
RpcResponse response = null;
if(!(res instanceof RpcResponse)){
throw new InvalidClassException("返回参数不正确,应当为:"+RpcResponse.class+" 类型");
}else{
response = (RpcResponse) res;
}
// 3. 返回服务端响应结果
if(response.getError() != null){ // 服务器产生异常
throw response.getError();
}
return response.getResult();
}finally{
try { // 清理资源,关闭流
if(oin != null) oin.close();
if(oout != null) oout.close();
if(server != null) server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
客户端代理
package edu.dongnao.dnrpc.simple;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* RpcClientProxy
* 客户端代理服务,客户端往服务端发起的调用将通过客户端代理来发起
*/
public class RpcClientProxy implements InvocationHandler{
private String host; // 服务端地址
private int port; // 服务端口号
public RpcClientProxy(String host, int port){
this.host = host;
this.port = port;
}
/**
* 生成业务接口的代理对象,代理对象做的事情,在invoke方法中。
* @param clazz 代理类型(接口)
* @return
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz){
// clazz 不是接口不能使用JDK动态代理
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{ clazz }, RpcClientProxy.this);
}
/**
* 动态代理做的事情,接口的实现不在本地,在网络中的其他进程中,我们通过实现了Rpc客户端的对象来发起远程服务的调用。
*/
public Object invoke(Object obj, Method method, Object[] params) throws Throwable {
// 调用前
System.out.println("执行远程方法前,可以做些事情");
// 封装参数,类似于序列化的过程
RpcRequest request = new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParamTypes(method.getParameterTypes());
request.setParams(params);
// 链接服务器调用服务
RpcClient client = new RpcClient();
Object rst = client.start(request, host, port);
// 调用后
System.out.println("执行远程方法后,也可以做些事情");
return rst;
}
}
服务提供者
package edu.dongnao.dnrpc.simple;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* RpcServer
* Rpc服务提供者
*/
public class RpcServer {
/**
* 启动指定的网络端口号服务,并监听端口上的请求数据。获得请求数据以后将请求信息委派给服务处理器,放入线程池中执行。
* @param port 监听端口
* @param clazz 服务类所在包名,多个用英文逗号隔开
*/
public void start(int port, String clazz) {
ServerSocket server = null;
try {
// 1. 创建服务端指定端口的socket连接
server = new ServerSocket(port);
// 2. 获取所有rpc服务类
Map<String, Object> services = getService(clazz);
// 3. 创建线程池
Executor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
while(true){
// 4. 获取客户端连接
Socket client = server.accept();
// 5. 放入线程池中执行
RpcServerHandler service = new RpcServerHandler(client, services);
executor.execute(service);
}
} catch (IOException e) {
e.printStackTrace();
}finally{
//关闭监听
if(server != null)
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 实例化所有rpc服务类,也可用于暴露服务信息到注册中心。
* @param clazz 服务类所在包名,多个用英文逗号隔开
* @return
*/
public Map<String,Object> getService(String clazz){
try {
Map<String, Object> services = new HashMap<String, Object>();
// 获取所有服务类
String[] clazzes = clazz.split(",");
List<Class<?>> classes = new ArrayList<Class<?>>();
for(String cl : clazzes){
List<Class<?>> classList = getClasses(cl);
classes.addAll(classList);
}
// 循环实例化
for(Class<?> cla:classes){
Object obj = cla.newInstance();
services.put(cla.getAnnotation(Service.class).value().getName(), obj);
}
return services;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 获取包下所有有@Sercive注解的类
* @param pckgname
* @return
* @throws ClassNotFoundException
*/
public static List<Class<?>> getClasses(String pckgname) throws ClassNotFoundException {
// 需要查找的结果
List<Class<?>> classes = new ArrayList<Class<?>>();
// 找到指定的包目录
File directory = null;
try {
ClassLoader cld = Thread.currentThread().getContextClassLoader();
if (cld == null)
throw new ClassNotFoundException("无法获取到ClassLoader");
String path = pckgname.replace('.', '/');
URL resource = cld.getResource(path);
if (resource == null)
throw new ClassNotFoundException("没有这样的资源:" + path);
directory = new File(resource.getFile());
} catch (NullPointerException x) {
throw new ClassNotFoundException(pckgname + " (" + directory + ") 不是一个有效的资源");
}
if (directory.exists()) {
// 获取包目录下的所有文件
String[] files = directory.list();
File[] fileList = directory.listFiles();
// 获取包目录下的所有文件
for (int i = 0; fileList != null && i < fileList.length; i++) {
File file = fileList[i];
//判断是否是Class文件
if (file.isFile() && file.getName().endsWith(".class")) {
Class<?> clazz = Class.forName(pckgname + '.' + files[i].substring(0, files[i].length() - 6));
if(clazz.getAnnotation(Service.class) != null){
classes.add(clazz);
}
}else if(file.isDirectory()){ //如果是目录,递归查找
List<Class<?>> result = getClasses(pckgname+"."+file.getName());
if(result != null && result.size() != 0){
classes.addAll(result);
}
}
}
} else{
throw new ClassNotFoundException(pckgname + "不是一个有效的包名");
}
return classes;
}
}
服务器请求处理
package edu.dongnao.dnrpc.simple;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.Socket;
import java.util.Map;
/**
* RpcServerHandler
* 服务端请求处理,处理来自网络IO的服务请求,并响应结果给网络IO。
*/
public class RpcServerHandler implements Runnable {
// 客户端网络请求socket,可以从中获得网络请求信息
private Socket clientSocket;
// 服务端提供处理请求的类集合
private Map<String, Object> serviceMap;
/**
* @param client 客户端socket
* @param services 所有服务
*/
public RpcServerHandler(Socket client, Map<String, Object> services) {
this.clientSocket = client;
this.serviceMap = services;
}
/**
* 读取网络中客户端请求的信息,找到请求的方法,执行本地方法获得结果,写入网络IO输出中。
*
*/
public void run() {
ObjectInputStream oin = null;
ObjectOutputStream oout = null;
RpcResponse response = new RpcResponse();
try {
// 1. 获取流以待操作
oin = new ObjectInputStream(clientSocket.getInputStream());
oout = new ObjectOutputStream(clientSocket.getOutputStream());
// 2. 从网络IO输入流中请求数据,强转参数类型
Object param = oin.readObject();
RpcRequest request = null;
if(!(param instanceof RpcRequest)){
response.setError(new Exception("参数错误"));
oout.writeObject(response);
oout.flush();
return;
}else{
// 反序列化RpcRequest
request = (RpcRequest) param;
}
// 3. 查找并执行服务方法
Object service = serviceMap.get(request.getClassName());
Class<?> clazz= service.getClass();
Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());
Object result = method.invoke(service, request.getParams());
// 4. 返回RPC响应,序列化RpcResponse
response.setResult(result);
// 序列化结果
oout.writeObject(response);
oout.flush();
return;
} catch (Exception e) {
try { //异常处理
if(oout != null){
response.setError(e);
oout.writeObject(response);
oout.flush();
}
} catch (Exception e1) {
e1.printStackTrace();
}
return;
}finally{
try { // 回收资源,关闭流
if(oin != null) oin.close();
if(oout != null) oout.close();
if(clientSocket != null) clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
自定义注解
package edu.dongnao.dnrpc.simple;
/**
* Service
*
*/
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Service
* 一个提供了RPC服务的实现类。
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Service {
/**
* 注解所属接口类型
* @return
*/
Class<?> value();
}
测试类
服务端先启动
public class ServerTest {
@Test
public void startServer() {
RpcServer server = new RpcServer();
server.start(9998, "edu.dongnao.dnrpc.simple.example");
}
}
客户端
public class ClientTest {
@Test
public void test() {
// 本地没有接口实现,通过代理获得接口实现实例
RpcClientProxy proxy = new RpcClientProxy("127.0.0.1", 9998);
StudentService service = proxy.getProxy(StudentService.class);
System.out.println(service.getInfo());
Student student = new Student();
student.setAge(23);
student.setName("hashmap");
student.setSex("男");
System.out.println(service.printInfo(student));
}
}
最简单的实例就是这样了。