分布式通信框架 - rmi

3,734 阅读4分钟

知识点:

1)什么是rmi
2)简单的实现rmi
3)rmi原理
4)手写rmi框架

首先谈下什么RPC?

Remote procedure call protocal 远程过程调用协议
不用知道具体细节,调用远程系统中类的方法,就跟调用本地方法一样。
RPC协议其实是一种规范。
包括Dubbo,Thrift,rmi,webservice,hessain

网络协议和网络IO对于调用端和服务端来说是透明的。

一个RPC框架应该包含的要素:

RMI概述

rmi(remote method invocation) 远程方法调用
可以认为是RPC的java版本

RMI使用的是JRMP(JAVA Remote Messageing Protocol),可以说JRMP是专门为java定制的通信协议,所以它是纯java的分布式解决方案。

怎么实现一个RMI程序

1)创建远程接口并且继承java.rmi.Remote 接口

package com.llf.rmidemo;

import java.rmi.Remote;
import java.rmi.RemoteException;

public interface SayHello extends Remote{
	  public String sayHello(String name)throws RemoteException;
}

2)实现我们这个远程接口并且继承 UnicastRemoteObject

package com.llf.rmidemo;

import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;

public class SayHelloImpl extends UnicastRemoteObject implements SayHello{

	protected SayHelloImpl() throws RemoteException {
	}

	@Override
	public String sayHello(String name) throws RemoteException {
		return "Hello LLF -->"+name;
	}

}

3)创建服务器程序 调用createRegistry方法注册远程对象

package com.llf.rmidemo;

import java.net.MalformedURLException;
import java.rmi.AlreadyBoundException;
import java.rmi.Naming;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;

public class HelloServer {
	public static void main(String[] args) {
		try {
			SayHello hello=new SayHelloImpl();
			LocateRegistry.createRegistry(8888);
			try {
				Naming.bind("rmi://localhost:8888/sayhello", hello);
				System.out.println("Server start success!");
			} catch (MalformedURLException e) {
				e.printStackTrace();
			} catch (AlreadyBoundException e) {
				e.printStackTrace();
			}
		} catch (RemoteException e) {
			e.printStackTrace();
		}
	}

}

4)创建客户端程序

package com.llf.rmidemo;

import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;

public class HelloClient {
	public static void main(String[] args) {
		try {
			SayHello hello=(SayHello) Naming.lookup("rmi://localhost:8888/sayhello");
			System.out.println(hello.sayHello("FXP"));
		} catch (MalformedURLException e) {
			e.printStackTrace();
		} catch (RemoteException e) {
			e.printStackTrace();
		} catch (NotBoundException e) {
			e.printStackTrace();
		}
	}

}

结果:

自己去实现一个RMI

1)编写服务器程序,暴露监听,可以使用socket 2)编写客户端程序,通过IP和端口连接到指定的服务,并且把我们的数据做封装(序列化) 3)服务器端收到请求先反序列化在进行业务逻辑处理,把返回结果序列化返回

rmi框架调用时序图

rmi源码分析

我们近乎的可以以如下的图来理解: a) stub和skeleton这俩个身份都是作为代理存在,客户端的称为stub,服务端的称为skeleton,通过这俩个对象屏蔽了远程方法调用的具体细节,这俩个是必不可少的。 b)Registry:注册所,提供了服务名到服务的映射。

结合这上面的图,再以上面的demo代码,我们来扒一扒rmi的底层源码 首先我们看提供服务的server方 createRegistry方法

服务端先创建了一个RegistryImpl的对象,然后做了一个安全校验,这边我们不用关注,重点是看setup方法。

进入到RegistryImpl类

然后进入到UnicastServerRef的exportObject方法 1)首先为传入的RegistryImpl创建一个代理对象stub 2)把UnicastServerRef的skeleton对象设置为当前RegistryImpl对象 3)skeleton,stub,unicastserverRef对象,id和一个boolean构造了一个target对象

再往下追就是export的exportObject方法

主要是调用listen方法创建一个serversocket,启动一条线程等待客户端请求。 至此为止我们的服务端已经起了服务等待客户端连接了。

客户端

这边其实就是创建一个stub的代理对象

用代码来模拟RMI底层过程如下: 新建一个User的对象

package com.llf.rmi;

public class User {
	
	private int age;

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}

}

编写一个Skeleton类供客户端调用【这块是rmi定义出来屏蔽底层序列化及流连接的,这边模拟写了下底层的序列化及流】

package com.llf.rmi;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;

//server程序
public class User_Skeleton extends Thread {

	private UserServer userServer;

	public User_Skeleton(UserServer userServer) {
		this.userServer = userServer;

	}

	public void run() {
		ServerSocket serverSocket = null;
		ObjectInputStream read = null;
		ObjectOutputStream oos = null;
		Socket socket=null;
		try {
			serverSocket = new ServerSocket(8888);
		    socket = serverSocket.accept();
			while (socket != null) {
				read = new ObjectInputStream(socket.getInputStream());
				String method = (String) read.readObject();
				if (method.equals("age")) {
					int age = userServer.getAge();
					oos = new ObjectOutputStream(socket.getOutputStream());
					oos.writeInt(age);
					oos.flush();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (serverSocket != null) {
				try {
					oos.close();
					read.close();
					socket.close();
					serverSocket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

写一个stub

package com.llf.rmi;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.net.UnknownHostException;

public class User_Stub extends User {
	private Socket socket;
	
	public User_Stub() {
		try {
			socket=new Socket("localhost", 8888);
		} catch (UnknownHostException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	public int getAge(){
		ObjectOutputStream oos=null;
		ObjectInputStream ois=null;
		try {
			oos=new ObjectOutputStream(socket.getOutputStream());
			oos.writeObject("age");
			oos.flush();
			
			ois=new ObjectInputStream(socket.getInputStream());
			return ois.readInt();
		} catch (IOException e) {
			e.printStackTrace();
		}finally {
			try {
				ois.close();
				oos.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
		}
		return 0;
	}

}

编写服务器代码

package com.llf.rmi;

public class UserServer extends User{
	public static void main(String[] args) {
		UserServer server=new UserServer();
		server.setAge(18);
		//模拟rmi生成的skeleton代理对象
		User_Skeleton user_Skeleton=new User_Skeleton(server);
		user_Skeleton.start();
		
	}
}

编写客户端代码

package com.llf.rmi;

public class UserClient {
	public static void main(String[] args) {
		User user=new User_Stub();
		int age=user.getAge();
		System.out.println(age);
	}
}