阅读 1247

从零开始的高并发(八)--- RPC框架的简单实现

前言

前情概要

上一篇我们简单过了一遍RPC是什么,三个过程,为什么我们需要它,它的特性和适用场景,RPC的流程及协议定义还有它的框架的一些小知识。理论经常会看的人昏昏欲睡,不知所云。如果能够结合一些代码来说明的话,那就方便理解很多了

以往链接

从零开始的高并发(一)--- Zookeeper的基础概念

从零开始的高并发(二)--- Zookeeper实现分布式锁

从零开始的高并发(三)--- Zookeeper集群的搭建和leader选举

从零开始的高并发(四)--- Zookeeper的分布式队列

从零开始的高并发(五)--- Zookeeper的配置中心应用

从零开始的高并发(六)--- Zookeeper的Master选举及官网小览

从零开始的高并发(七)--- RPC的介绍,协议及框架

内容一:RPC的流程和任务

1. RPC的流程

其实这个在上一篇的2 - ① 也已经提到过了,如果忘了,没关系,我再复制过来

stub:分布式计算中的存根是一段代码,它转换在远程过程调用期间Client和server之间传递的参数

1.客户端处理过程中调用client stub(就像调用本地方法一样),传入参数

2.Client stub将参数编组为消息,然后通过系统调用向服务端发送消息

3.客户端本地操作系统将消息从客户端机器发送到服务端机器

4.服务端操作系统将接收到的数据包传递给client stub

5.server stub解组消息为参数

6.server stub再调用服务端的过程,过程执行结果以反方向的相同步骤响应给客户端

2. 从使用者的角度开始分析

1.定义过程接口
2.服务端实现接口的整个过程
3.客户端使用生成的stub代理对象
复制代码

内容二:RPC框架的设计及实现

1. 准备一个Student的实体类及基础接口

客户端生成过程接口的代理对象,通过设计一个客户端代理工厂,使用JDK动态代理即可生成接口的代理对象

① 定义一个StudentService接口

Student类有三个属性name(String),age(int),sex(String),节省篇幅就不贴代码了,提供getter,setter和toString方法即可

public interface StudentService {
	/**
	 * 获取信息
	 * @return
	 */
	public Student getInfo();
	
	//打印student的信息并返回一个boolean值
	public boolean printInfo(Student student);
}
复制代码

并且提供一个简单的实现,其实就是打印一个Student的信息出来而已

@Service(StudentService.class)
public class StudentServiceImpl implements StudentService {

	public Student getInfo() {
		Student person = new Student();
    	person.setAge(25);
		person.setName("说出你的愿望吧~");
		person.setSex("男");
		return person;
	}

	public boolean printInfo(Student person) {
		if (person != null) {
			System.out.println(person);
			return true;
		}
		return false;
	}
	
	public static void main(String[] args) {
		new Thread(()->{
			System.out.println("111");
		}).start();;
	}
}
复制代码

2.客户端的搭建

① 从测试类去了解所需

首先,客户端通过我们的本地代理,获得我们的StudentService的代理类,此时我们客户端本地是肯定不存在StudentService的实现的,此时寻址我们是直接给出来了

public class ClientTest {
	@Test
	public void test() {
		// 本地没有接口实现,通过代理获得接口实现实例
		RpcClientProxy proxy = new RpcClientProxy("192.168.80.1", 9998);
		StudentService service = proxy.getProxy(StudentService.class);
		
		System.out.println(service.getInfo());
		
		Student student = new Student();
		student.setAge(23);
		student.setName("hashmap");
		student.setSex("男");
		System.out.println(service.printInfo(student));
	}
}
复制代码

此时我们的关注点转到客户端是如何帮我们进行代理的

② 实现了InvocationHandler接口的RpcClientProxy

/**
 * RpcClientProxy
 * 客户端代理服务,客户端往服务端发起的调用将通过客户端代理来发起
 */
public class RpcClientProxy implements InvocationHandler{
	private String host;	// 服务端地址
	private int port;	// 服务端口号
	
	public RpcClientProxy(String host, int port){
		this.host = host;
		this.port = port;
	}
	
	/**
	 * 生成业务接口的代理对象,代理对象做的事情,在invoke方法中。
	 * @param clazz 代理类型(接口)
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public <T> T getProxy(Class<T> clazz){
		// clazz 不是接口不能使用JDK动态代理
		return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{ clazz }, RpcClientProxy.this);
	}
	
	/**
	 * 动态代理做的事情,接口的实现不在本地,在网络中的其他进程中,我们通过实现了Rpc客户端的对象来发起远程服务的调用。
	 */
	public Object invoke(Object obj, Method method, Object[] params) throws Throwable {
		// 调用前
		System.out.println("执行远程方法前,可以做些事情");
		
		// 调用远程服务,需要封装参数,类似于序列化的过程
		RpcRequest request = new RpcRequest();
		request.setClassName(method.getDeclaringClass().getName());
		request.setMethodName(method.getName());
		request.setParamTypes(method.getParameterTypes());
		request.setParams(params);
		// 链接服务器调用服务
		RpcClient client = new RpcClient();
		Object rst = client.start(request, host, port);
		
		// 调用后
		System.out.println("执行远程方法后,也可以做些事情");
		return rst;
	}
}
复制代码

JDK提供了Proxy类来实现我们的动态代理,可以通过newProxyInstance(ClassLoader var0, Class<?>[] var1, InvocationHandler var2)方法来实例化一个代理对象,此时我们传入的参数clazz是规定必须为一个接口的,如果不是接口就不能使用JDK动态代理

而第三个参数RpcClientProxy.this则是newProxyInstance()方法虽然帮我们创建好了实例,但是创建实例完成后的具体动作必须由这个InvocationHandler来提供

InvocationHandler这个接口里面仅仅只有一个 Object invoke(Object var1, Method var2, Object[] var3) throws Throwable,这个方法的参数相信不难理解,第一个是代理对象,第二个是执行的方法,第三个是所需的参数集

回到我们刚刚的代码,在我执行System.out.println(service.getInfo())这条语句的时候,我们的逻辑就会跳到invoke()的实现中来,在invoke()方法的注释中也把过程很详细的说明了,首先我们需要调用远程服务了,进行一个参数的封装,之后就进行一个网络连接把这些参数发送给我们的服务端,此时我们需要用到RpcClient了

③ RpcClient

在start()方法中,我们的RpcRequest request是实现了Serializable接口的,所以此时封装好的数据会转换成一个二进制然后被flush()过去,此时我们消息已经发送了,需要等待服务端的响应,响应我们就需要通过我们的服务端ObjectOutputStream来接收一个输入流

/**
 * RpcClient
 * Rpc客户端,代表业务代码作为客户端,往远端服务发起请求。
 */
public class RpcClient {
	
	/**
	 * 通过网络IO,打开远端服务连接,将请求数据写入网络中,并获得响应结果。
	 * 
	 * @param request 将要发送的请求数据
	 * @param host 远端服务域名或者ip地址
	 * @param port 远端服务端口号
	 * @return 服务端响应结果
	 * @throws Throwable 抛出的异常
	 */
	public Object start(RpcRequest request, String host, int port) throws Throwable{
		// 打开远端服务连接
		Socket server = new Socket(host, port);
		
		ObjectInputStream oin = null;
		ObjectOutputStream oout = null;
		
		try {
			// 1. 服务端输出流,写入请求数据,发送请求数据
			oout = new ObjectOutputStream(server.getOutputStream());
			oout.writeObject(request);
			oout.flush();
			
			// 2. 服务端输入流,获取返回数据,转换参数类型
			// 类似于反序列化的过程
			oin = new ObjectInputStream(server.getInputStream());
			Object res = oin.readObject();
			RpcResponse response = null;
			if(!(res instanceof RpcResponse)){
				throw new InvalidClassException("返回参数不正确,应当为:"+RpcResponse.class+" 类型");
			}else{
				response = (RpcResponse) res;
			}
			
			// 3. 返回服务端响应结果
			if(response.getError() != null){ // 服务器产生异常
				throw response.getError();
			}
			return response.getResult();
		}finally{
			try {	// 清理资源,关闭流
				if(oin != null) oin.close();
				if(oout != null) oout.close();
				if(server != null) server.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
 
}
复制代码

④ 进行参数封装的RpcRequest

/**
 * RpcRequest
 * Rpc请求对象,请求远端服务服务的内容,在网络上进行传输。
 */
public class RpcRequest implements Serializable{
	// 需要请求的类名
	private String className;
	
	// 需求请求的方法名
    private String methodName;
    
    // 请求方法的参数类型
    private Class<?>[] paramTypes;
    
    // 请求的参数值
    private Object[] params;
	
	public String getClassName() {
		return className;
	}
	public void setClassName(String className) {
		this.className = className;
	}
	public String getMethodName() {
		return methodName;
	}
	public void setMethodName(String methodName) {
		this.methodName = methodName;
	}
	public Class<?>[] getParamTypes() {
		return paramTypes;
	}
	public void setParamTypes(Class<?>[] paramTypes) {
		this.paramTypes = paramTypes;
	}
	public Object[] getParams() {
		return params;
	}
	public void setParams(Object[] params) {
		this.params = params;
	}
 
}
复制代码

⑤ Rpc服务端响应结果包装RpcResponse

同时也是实现了JDK默认的序列化Serializable

/**
 * RpcResponse
 * Rpc服务端响应结果包装类,在网络上进行传输。
 */
public class RpcResponse implements Serializable {
	// 可能抛出的异常
	private Throwable error;
	// 响应的内容或结果
        private Object result;
    
	public Throwable getError() {
		return error;
	}
	public void setError(Throwable error) {
		this.error = error;
	}
	public Object getResult() {
		return result;
	}
	public void setResult(Object result) {
		this.result = result;
	}
    
    
}
复制代码

3.服务端的搭建

① 服务端的模拟ServerTest

public class ServerTest {

	@Test
	public void startServer() {
		RpcServer server = new RpcServer();
		server.start(9998, "rpc.simple.RpcServer");
	}
	
	public static void main(String[] args) {
	}
}
复制代码

给到一个端口号,参数中带有一个包,功能是扫描某个包下的服务

② start()方法的实现

创建一个Map类型的集合services存放扫描到提供rpc服务的类,此时因为没有放在注册中心上所以就不存在寻址了。后面将会把它放入zookeeper的注册中心

getService()下,我们在ServerTest不是提供了一个包名吗,此时我们先去找到了它们所有的classes(请参考getClasses()方法),getClasses()中我们其实主要是先根据提供的包名往下找,要是目录都有问题的话就抛出异常,如果没问题,就开始遍历此目录下的所有文件,遍历出来的结果如果发现这个文件是class文件,就把其实例化,并且进行判断是否存在一个自定义注解@service,标注了这个注解的类就是RPC服务的实现类。如果存在这个注解,那就是我们需要找的rpc服务,就把它装到一个结果集classes中,如果目录下面仍然是目录,那就自己调用自己,直到看到class文件为止

当我们把所有的class都找到了,回到getService()方法下,就都集中放于一个classList中,然后把它们Map化,就是把接口的名称作为key,把实例作为value(services.put(cla.getAnnotation(Service.class).value().getName(), obj))。

最后再回到start(),进行完服务扫描之后还会有一个RpcServerHandler来进行处理

/**
 * RpcServer
 * Rpc服务提供者
 */
public class RpcServer {
	
	/**
	 * 启动指定的网络端口号服务,并监听端口上的请求数据。获得请求数据以后将请求信息委派给服务处理器,放入线程池中执行。
	 * @param port 监听端口
	 * @param clazz 服务类所在包名,多个用英文逗号隔开
	 */
	public void start(int port, String clazz) {
		ServerSocket server = null;
		try {
			// 1. 创建服务端指定端口的socket连接
			server = new ServerSocket(port);
			// 2. 获取所有rpc服务类
			Map<String, Object> services = getService(clazz);
			// 3. 创建线程池
			Executor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
			while(true){
				// 4. 获取客户端连接
				Socket client = server.accept();
				// 5. 放入线程池中执行
				RpcServerHandler service = new RpcServerHandler(client, services);
				executor.execute(service);
			}
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			//关闭监听
			if(server != null)
				try {
					server.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
		}
	}
	
	/**
	 * 实例化所有rpc服务类,也可用于暴露服务信息到注册中心。
	 * @param clazz 服务类所在包名,多个用英文逗号隔开
	 * @return
	 */
	public Map<String,Object> getService(String clazz){
		try {
			Map<String, Object> services = new HashMap<String, Object>();
			// 获取所有服务类
			String[] clazzes = clazz.split(",");
			List<Class<?>> classes = new ArrayList<Class<?>>();
			for(String cl : clazzes){
				List<Class<?>> classList = getClasses(cl);
				classes.addAll(classList);
			}
			// 循环实例化
			for(Class<?> cla:classes){
				Object obj = cla.newInstance();
				services.put(cla.getAnnotation(Service.class).value().getName(), obj);
			}
			return services;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	/**
	 * 获取包下所有有@Sercive注解的类
	 * @param pckgname
	 * @return
	 * @throws ClassNotFoundException
	 */
	public static List<Class<?>> getClasses(String pckgname) throws ClassNotFoundException {
		// 需要查找的结果
		List<Class<?>> classes = new ArrayList<Class<?>>();
		// 找到指定的包目录
		File directory = null;
		try {
			ClassLoader cld = Thread.currentThread().getContextClassLoader();
			if (cld == null)
				throw new ClassNotFoundException("无法获取到ClassLoader");
			String path = pckgname.replace('.', '/');
			URL resource = cld.getResource(path);
			if (resource == null)
				throw new ClassNotFoundException("没有这样的资源:" + path);
			directory = new File(resource.getFile());
		} catch (NullPointerException x) {
			throw new ClassNotFoundException(pckgname + " (" + directory + ") 不是一个有效的资源");
		}
		if (directory.exists()) {
			// 获取包目录下的所有文件
			String[] files = directory.list();
			File[] fileList = directory.listFiles();
			// 获取包目录下的所有文件
			for (int i = 0; fileList != null && i < fileList.length; i++) {
				File file = fileList[i];
				//判断是否是Class文件
				if (file.isFile() && file.getName().endsWith(".class")) {
					Class<?> clazz = Class.forName(pckgname + '.' + files[i].substring(0, files[i].length() - 6));
					if(clazz.getAnnotation(Service.class) != null){
						classes.add(clazz);
					}
				}else if(file.isDirectory()){ //如果是目录,递归查找
					List<Class<?>> result = getClasses(pckgname+"."+file.getName());
					if(result != null && result.size() != 0){
						classes.addAll(result);
					}
				}
			}
		} else{
			throw new ClassNotFoundException(pckgname + "不是一个有效的包名");
		}
		return classes;
	}
}
复制代码

③ 进行处理的RpcServerHandler

和刚刚的RpcClient非常类似,都是序列化和反序列化的过程,主要是第三步中获得了实例和方法及其参数后,再调用invoke()方法然后把结果放入response的过程

/**
 * RpcServerHandler
 * 服务端请求处理,处理来自网络IO的服务请求,并响应结果给网络IO。
 */
public class RpcServerHandler implements Runnable {
	
	// 客户端网络请求socket,可以从中获得网络请求信息
	private Socket clientSocket;
	
	// 服务端提供处理请求的类集合
	private Map<String, Object> serviceMap;
	
	/**
	 * @param client 客户端socket
	 * @param services 所有服务
	 */
	public RpcServerHandler(Socket client, Map<String, Object> services) {
		this.clientSocket = client;
		this.serviceMap = services;
	}
 
 
	/**
	 * 读取网络中客户端请求的信息,找到请求的方法,执行本地方法获得结果,写入网络IO输出中。
	 * 
	 */
	public void run() {
		ObjectInputStream oin = null;
		ObjectOutputStream oout = null;
		RpcResponse response = new RpcResponse();
		try {
			// 1. 获取流以待操作
			oin = new ObjectInputStream(clientSocket.getInputStream());
			oout = new ObjectOutputStream(clientSocket.getOutputStream());
			
			// 2. 从网络IO输入流中请求数据,强转参数类型
			Object param = oin.readObject();
			RpcRequest  request = null;
			if(!(param instanceof RpcRequest)){
				response.setError(new Exception("参数错误"));
				oout.writeObject(response);
				oout.flush();
				return;
			}else{
				// 反序列化RpcRequest
				request = (RpcRequest) param;
			}
			
			// 3. 查找并执行服务方法
			Object service = serviceMap.get(request.getClassName());
			Class<?> clazz= service.getClass();
			Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());
			Object result = method.invoke(service, request.getParams());
			
			// 4. 返回RPC响应,序列化RpcResponse
			response.setResult(result);
			// 序列化结果
			oout.writeObject(response);
			oout.flush();
			return;
		} catch (Exception e) {
			try {	//异常处理
				if(oout != null){
					response.setError(e);
					oout.writeObject(response);
					oout.flush();
				}
			} catch (Exception e1) {
				e1.printStackTrace();
			}
			return;
		}finally{
			try {	// 回收资源,关闭流
				if(oin != null) oin.close();
				if(oout != null) oout.close();
				if(clientSocket != null) clientSocket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
 
}
复制代码

4.运行结果

先开启ServerTest再开启ClientTest,简单快捷,注意别去右键跑main方法即可

内容三:优化客户端的举措

1.发现者的引入

设计客户端的时候,在ClientStubInvocationHandler中需要完成的两件事为编组消息和发送网络请求,而将请求的内容编组为消息这件事就交由客户端的stub代理,它除了消息协议和网络层的事务以外,可能还存在一个服务信息发现,此外消息协议可能也是会存在变化的,我们也需要去支持多种协议,这个其实是和框架对协议的支持广度有关的。比如dubbo相对于spring cloud而言对协议的支持就相对灵活一些

此时我们需要得知某服务用的是什么协议,所以我们需要引入一个服务发现者

2.协议层

我们想要做到支持多种协议,类该如何设计(面向接口,策略模式,组合)

此时我们的协议需要抽象出来,对于协议的内容需要进行编组和解组,比如我们上面提供的JSON和HTTP两种不同的实现,而此时客户端的存根里面就不仅仅只是需要服务发现者,还需要我们对于这个协议的支持

① 补充:如何从zookeeper中获取注册信息

主要看regist()方法,我们在注册的时候把服务信息进行了拼接,并创建成临时节点,父节点为持久节点。servicePath是类似于dubbo的一个目录结构,一个根目录/rpc+服务名称serviceName+service,获取服务的方法loadServiceResouces()也不难,根据这些地址获取它们下面的子节点,把所有的url加载出来给到调用者

public class RegistCenter {
	ZkClient client = new ZkClient("localhost:2181");
	
	private String centerRootPath = "/rpc";
	
	public RegistCenter() {
		client.setZkSerializer(new MyZkSerializer());
	}
	
	public void regist(ServiceResource serviceResource) {
		String serviceName = serviceResource.getServiceName();
		String uri = JsonMapper.toJsonString(serviceResource);
		try {
			uri = URLEncoder.encode(uri, "UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		String servicePath = centerRootPath + "/"+serviceName+"/service";
		if(! client.exists(servicePath)) {
			client.createPersistent(servicePath, true);
		}
		String uriPath = servicePath+"/"+uri;
		client.createEphemeral(uriPath);
	}
	
	/**
	 * 加载配置中心中服务资源信息
	 * @param serviceName
	 * @return
	 */
	public List<ServiceResource> loadServiceResouces(String serviceName) {
		String servicePath = centerRootPath + "/"+serviceName+"/service";
		List<String> children = client.getChildren(servicePath);
		List<ServiceResource> resources = new ArrayList<ServiceResource>();
		for(String ch : children) {
			try {
				String deCh = URLDecoder.decode(ch, "UTF-8");
				ServiceResource r = JsonMapper.fromJsonString(deCh, ServiceResource.class);
				resources.add(r);
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			}
		}
		return resources;
	}
	
	private void sub(String serviceName, ChangeHandler handler) {
		/*
		String path = centerRootPath + "/"+serviceName+"/service";
		client.subscribeChildChanges(path, new IZkChildListener() {
			@Override
			public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
				handler();
			}
		});
		client.subscribeDataChanges(path, new IZkDataListener() {
			@Override
			public void handleDataDeleted(String dataPath) throws Exception {
				handler();
			}
			
			@Override
			public void handleDataChange(String dataPath, Object data) throws Exception {
				handler();
			}
		});
		*/
	}
	
	interface ChangeHandler {
		/**
		 * 发生变化后给一个完整的属性对象
		 * @param resource
		 */
		void itemChange(ServiceResource resource);
	}
}
复制代码

② ClientStubProxyFactory

/**
 * ClientStubProxyFactory
  *   客户端存根代理工厂
 */
public class ClientStubProxyFactory {

	private ServiceInfoDiscoverer sid;

	private Map<String, MessageProtocol> supportMessageProtocols;

	private NetClient netClient;

	private Map<Class<?>, Object> objectCache = new HashMap<>();
	
	/**
	 * 
	 * 
	 * @param <T>
	 * @param interf
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public <T> T getProxy(Class<T> interf) {
		T obj = (T) this.objectCache.get(interf);
		if (obj == null) {
			obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[] { interf },
					new ClientStubInvocationHandler(interf));
			this.objectCache.put(interf, obj);
		}

		return obj;
	}

	public ServiceInfoDiscoverer getSid() {
		return sid;
	}

	public void setSid(ServiceInfoDiscoverer sid) {
		this.sid = sid;
	}

	public Map<String, MessageProtocol> getSupportMessageProtocols() {
		return supportMessageProtocols;
	}

	public void setSupportMessageProtocols(Map<String, MessageProtocol> supportMessageProtocols) {
		this.supportMessageProtocols = supportMessageProtocols;
	}

	public NetClient getNetClient() {
		return netClient;
	}

	public void setNetClient(NetClient netClient) {
		this.netClient = netClient;
	}
	
	/**
	 * ClientStubInvocationHandler
	 * 客户端存根代理调用实现
	 * @date 2019年4月12日 下午2:38:30
	 */
	private class ClientStubInvocationHandler implements InvocationHandler {
		private Class<?> interf;

		public ClientStubInvocationHandler(Class<?> interf) {
			super();
			this.interf = interf;
		}

		@Override
		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

			// 1、获得服务信息
			String serviceName = this.interf.getName();
			ServiceInfo sinfo = sid.getServiceInfo(serviceName);

			if (sinfo == null) {
				throw new Exception("远程服务不存在!");
			}

			// 2、构造request对象
			Request req = new Request();
			req.setServiceName(sinfo.getName());
			req.setMethod(method.getName());
			req.setPrameterTypes(method.getParameterTypes());
			req.setParameters(args);

			// 3、协议层编组
			// 获得该方法对应的协议
			MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
			// 编组请求
			byte[] data = protocol.marshallingRequest(req);

			// 4、调用网络层发送请求
			byte[] repData = netClient.sendRequest(data, sinfo);

			// 5解组响应消息
			Response rsp = protocol.unmarshallingResponse(repData);

			// 6、结果处理
			if (rsp.getException() != null) {
				throw rsp.getException();
			}

			return rsp.getReturnValue();
		}
	}
}
复制代码

ClientStub中有两个引用,一个是服务发现接口ServiceInfoDiscoverer,作用为根据服务名获得远程服务信息,提供一个ServiceInfo getServiceInfo(String name)方法,还有就是对于不同协议的支持supportMessageProtocols,MessageProtocol我们也是定义了一个接口,这个接口就需要比较详细了,编码成二级制,和解码成Request等,对于response也是同样这么个过程

/**
 * 通信协议接口
 * MessageProtocol
 */
public interface MessageProtocol {
	/**
	 * 编组请求消息
	 * @param req
	 * @return
	 */
	byte[] marshallingRequest(Request req);
	
	/**
	 * 解编组请求消息
	 * @param data
	 * @return
	 */
	Request unmarshallingRequest(byte[] data);
	
	/**
	 * 编组响应消息
	 * @param rsp
	 * @return
	 */
	byte[] marshallingResponse(Response rsp);
	
	/**
	 * 解编组响应消息
	 * @param data
	 * @return
	 */
	Response unmarshallingResponse(byte[] data);
}
复制代码

此时又存在一些问题,单纯依靠编组和解组的方法是不够的,编组和解组的操作对象是请求,响应,但是它们的内容是不同的,此时我们又需要定义框架标准的请求响应类

request有具体的服务名,服务方法,消息头,参数类型和参数,同样的response也有状态(通过枚举),消息头,返回值及类型以及是否存在异常。

此时协议层扩展为4个方法

将消息协议独立为一层,客户端和服务端都需要使用

3. 网络层

网络层的工作主要是发送请求和获得响应,此时我们如果需要发起网络请求必定先要知道服务地址,此时我们利用下图中serviceInfo对象作为必须依赖,setRequest()方法里面会存在发送数据,还有发送给谁,此时给出了BIO和Netty两种实现

所以我们需要的三个依赖就都出来了,一个是服务发现者,一个是协议支持,再然后就是我们网络层的NetClient

4. 总图

紫色代表客户端代理部分,浅绿色属于服务发现,浅蓝色属于协议部分

5.代码部分(可直接无视)

因为这些代码和主要的思路已经没有瓜葛了,只是一些功能代码,所以可以直接忽略了。如果实在是想自己跑一下,也可以问我要一个小样。

① 依旧是回到我们的ClientStubProxyFactory

可以和内容二的RpcClientProxy做一个对比,在原有的基础上加上了三个依赖ServiceInfoDiscoverer,supportMessageProtocols,netClient

在ClientStubProxyFactory中对Object做了一个缓存,如果已经存在这个缓存就直接返回,没有的话加入到缓存中然后new出来,只是一个小小的不同。

② invoke()方法的改变

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

	// 1、获得服务信息
	String serviceName = this.interf.getName();
	ServiceInfo sinfo = sid.getServiceInfo(serviceName);

	if (sinfo == null) {
		throw new Exception("远程服务不存在!");
	}

	// 2、构造request对象
	Request req = new Request();
	req.setServiceName(sinfo.getName());
	req.setMethod(method.getName());
	req.setPrameterTypes(method.getParameterTypes());
	req.setParameters(args);

	// 3、协议层编组
	// 获得该方法对应的协议
	MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
	// 编组请求
	byte[] data = protocol.marshallingRequest(req);

	// 4、调用网络层发送请求
	byte[] repData = netClient.sendRequest(data, sinfo);

	// 5、解组响应消息
	Response rsp = protocol.unmarshallingResponse(repData);

	// 6、结果处理
	if (rsp.getException() != null) {
		throw rsp.getException();
	}

	return rsp.getReturnValue();
}
复制代码

首先是服务发现,在我们执行 ① 中提到的getProxy()方法时,此时代理的接口已经直接告诉我们了,所以我们就直接获得了接口信息interf,然后调用getName()方法获取接口的名称,通过接口名,调用服务发现者ServiceInfo提供的getServiceInfo()方法就能获取服务的具体信息,然后放入请求参数request里面,接下来给request的各个属性赋值

之后我们就开始寻找这个服务所对应的协议,获得协议之后可以获取协议支持对象,之后进行编组请求,转换成二进制,通过netClient发送过去,顺带连同服务端信息给出去。获取结果repData进行解组(二进制回到response),之后进行结果处理。

③ 服务发现者的实现

之前也提到了,服务发现者ServiceInfoDiscoverer是作为一个接口提供了getServiceInfo()方法的

有两种不同的实现,本地实现我们可以自己搞一个配置文件加载进来,把相关的服务信息弄进去得了

zookeeper的服务发现实现如下,类似于我们一开始在2 - ① 中补充的zookeeper的内容

public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer {
	ZkClient client = new ZkClient("localhost:2181");
	
	private String centerRootPath = "/rpc";
	
	public ZookeeperServiceInfoDiscoverer() {
		client.setZkSerializer(new MyZkSerializer());
	}
	
	public void regist(ServiceInfo serviceResource) {
		String serviceName = serviceResource.getName();
		String uri = JSON.toJSONString(serviceResource);
		try {
			uri = URLEncoder.encode(uri, "UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		String servicePath = centerRootPath + "/"+serviceName+"/service";
		if(! client.exists(servicePath)) {
			client.createPersistent(servicePath, true);
		}
		String uriPath = servicePath+"/"+uri;
		client.createEphemeral(uriPath);
	}
	
	/**
	 * 加载配置中心中服务资源信息
	 * @param serviceName
	 * @return
	 */
	public List<ServiceInfo> loadServiceResouces(String serviceName) {
		String servicePath = centerRootPath + "/"+serviceName+"/service";
		List<String> children = client.getChildren(servicePath);
		List<ServiceInfo> resources = new ArrayList<ServiceInfo>();
		for(String ch : children) {
			try {
				String deCh = URLDecoder.decode(ch, "UTF-8");
				ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class);
				resources.add(r);
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			}
		}
		return resources;
	}
	
	@Override
	public ServiceInfo getServiceInfo(String name) {
		List<ServiceInfo> list = loadServiceResouces(name);
		ServiceInfo info = list.get(0);
		list.forEach((e)->{
			if(e != info) {
				info.addAddress(e.getAddress().get(0));
			}
		});
		return info;
	}

}
复制代码

④ 协议支持相关

这里只实现了JSON的,通过fastJSON来实现

public class JSONMessageProtocol implements MessageProtocol {

	@Override
	public byte[] marshallingRequest(Request req) {
		Request temp = new Request();
		temp.setServiceName(req.getServiceName());
		temp.setMethod(req.getMethod());
		temp.setHeaders(req.getHeaders());
		temp.setPrameterTypes(req.getPrameterTypes());

		if (req.getParameters() != null) {
			Object[] params = req.getParameters();
			Object[] serizeParmas = new Object[params.length];
			for (int i = 0; i < params.length; i++) {
				serizeParmas[i] = JSON.toJSONString(params[i]);
			}

			temp.setParameters(serizeParmas);
		}

		return JSON.toJSONBytes(temp);
	}

	@Override
	public Request unmarshallingRequest(byte[] data) {
		Request req = JSON.parseObject(data, Request.class);
		if(req.getParameters() != null) {
			Object[] serizeParmas = req.getParameters();
			Object[] params = new Object[serizeParmas.length];
			for(int i = 0; i < serizeParmas.length; i++) {
				Object param = JSON.parseObject(serizeParmas[i].toString(), Object.class);
				params[i] = param;
			}
			req.setParameters(params);
		}
		return req;
	}

	@Override
	public byte[] marshallingResponse(Response rsp) {
		Response resp = new Response();
		resp.setHeaders(rsp.getHeaders());
		resp.setException(rsp.getException());
		resp.setReturnValue(rsp.getReturnValue());
		resp.setStatus(rsp.getStatus());
		return JSON.toJSONBytes(resp);
	}

	@Override
	public Response unmarshallingResponse(byte[] data) {
		return JSON.parseObject(data, Response.class);
	}

}
复制代码

⑤ NetClient相关

分为BIO和Netty两种模式,netty中使用了EventLoopGroup

BIO:
public class BioNetClient implements NetClient {
	
	@Override
	public byte[] sendRequest(byte[] data, ServiceInfo sinfo) {
		List<String> addressList = sinfo.getAddress();
		int randNum = new Random().nextInt(addressList.size());
		String address = addressList.get(randNum);
		String[] addInfoArray = address.split(":");
		try {
			return startSend(data, addInfoArray[0], Integer.valueOf(addInfoArray[1]));
		} catch (Throwable e) {
			e.printStackTrace();
		}
		return null;
	}
	
	/**
	 * 通过网络IO,打开远端服务连接,将请求数据写入网络中,并获得响应结果。
	 * 
	 * @param requestData 将要发送的请求数据
	 * @param host 远端服务域名或者ip地址
	 * @param port 远端服务端口号
	 * @return 服务端响应结果
	 * @throws Throwable 抛出的异常
	 */
	private byte[] startSend(byte[] requestData, String host, int port) throws Throwable{
		// 打开远端服务连接
		Socket serverSocket = new Socket(host, port);
		
		InputStream in = null;
		OutputStream out = null;
		
		try {
			// 1. 服务端输出流,写入请求数据,发送请求数据
			out = serverSocket.getOutputStream();
			out.write(requestData);
			out.flush();
			
			// 2. 服务端输入流,获取返回数据,转换参数类型
			// 类似于反序列化的过程
			in = serverSocket.getInputStream();
			byte[] res = new byte[1024];
			int readLen = -1;
			ByteArrayOutputStream baos = new ByteArrayOutputStream();
			while((readLen = in.read(res)) > 0) {
				baos.write(res, 0, readLen);
			}
			return baos.toByteArray();
		}finally{
			try {	// 清理资源,关闭流
				if(in != null) in.close();
				if(out != null) out.close();
				if(serverSocket != null) serverSocket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
复制代码
netty模式:
public class NettyNetClient implements NetClient {
	private SendHandler sendHandler;
	private Map<String, SendHandler> sendHandlerMap = new ConcurrentHashMap<String, SendHandler>();
	
	@Override
	public byte[] sendRequest(byte[] data, ServiceInfo sinfo) {
		try {
			List<String> addressList = sinfo.getAddress();
			int randNum = new Random().nextInt(addressList.size());
			String address = addressList.get(randNum);
			String[] addInfoArray = address.split(":");
			SendHandler handler = sendHandlerMap.get(address);
			if(handler == null) {
				sendHandler = new SendHandler(data);
				new Thread(()->{
					try {
						connect(addInfoArray[0], Integer.valueOf(addInfoArray[1]));
					} catch (NumberFormatException e) {
						e.printStackTrace();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}).start();
			}
			byte[] respData = (byte[]) sendHandler.rspData();
			return respData;
		} catch (NumberFormatException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
	
	public void connect(String host, int port) throws Exception {
        // 配置客户端
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            //EchoClientHandler handler = new EchoClientHandler();
            
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(sendHandler);
                 }
             });

            // 启动客户端连接
            ChannelFuture f = b.connect(host, port).sync();
            // 等待客户端连接关闭
            f.channel().closeFuture().sync();
        } finally {
            // 释放线程组资源
            group.shutdownGracefully();
        }
    }
}
复制代码

⑥ 运行结果

可以自行模拟一个消费者和一个生产者进行测试,这里就不贴出来了

finally

之后会继续dubbo的内容

下一篇:从零开始的高并发(九)--- dubbo的核心功能及协议

关注下面的标签,发现更多相似文章
评论