一个轻量级RPC的实现

6,634 阅读10分钟

前言

最近公司在做分布式相关的东西,需要用到RPC。以前对RPC不是很了解,网上也看了很多文章,发现看过之后并没有加深我的理解,仍然云里雾里,只知道是远程过程调用,一个计算机上的程序可以调用另一个计算机上的服务,或一个进程调用另一个进程提供的服务,仅此而已。也用了golang官方的net/rpc库实现了这个目的,但是对net/rpc的底层不了解(只是会用),本着一颗“钻牛角尖”的心,我决定深入研究一下RPC的原理,并自己实现一个RPC,故有了这篇文章。

什么是RPC

维基百科对RPC是这样解释的:

In distributed computing, a remote procedure call (RPC) is when a computer program causes a procedure (subroutine) to execute in a different address space (commonly on another computer on a shared network), which is coded as if it were a normal (local) procedure call, without the programmer explicitly coding the details for the remote interaction.

我就不翻译了,大体是说程序A调用程序B,A跟B不在一个地址空间(通常A跟B也不在同一台电脑上),但是A调用B就跟调用本地的程序是一样的,程序员无需对这个远程的交互细节进行额外的编程。

这里有两个关键词,我用黑体标出来了。 分布式:表明了RPC的应用场景,一般是用在多台计算机,而不是单台计算机上。 在不同的地址空间:说明了至少有两个进程,一个服务端进程,一个客户端进程。服务端进程提供服务(暴露出某些接口),客户端进程调用服务。当然测试的时候服务端和客户端程序写在一个文件里面也行,在主线程里面写服务端程序,提供接口,然后新开一个线程写客户端程序,调用接口也是可以的。

RPC原理图

实现一个RPC需要解决哪些问题

从RPC原理图中我们可以看出,server端是服务提供方,client端是服务调用方。既然server端可以提供服务,那么它要先实现服务的注册,只有注册过的服务才能被client端调用。前文也说过,client端和server端一般不在一个进程内,甚至不在一个计算机内,那么他们之间的通信必然是通过网络传输,这样就涉及到了网络传输协议,说的更直白的,就是如何将client端的数据(一般是要调用的服务名和相应的参数)安全传输到server端,而server端也能完整的接收到。在client端和server端,数据一般是以对象的形式存在,而对象是无法进行网络传输的,在网络传输之前,我们需要先把对象序列化成字节流,然后传输这些字节流,server端在接收到这些字节流之后,再反序列化得到原始对象,这就是序列化与反序列化。总结一下,要实现一个RPC就必须解决这三个问题,即

  1. 服务的注册
  2. 网络传输
  3. 序列化与反序列化

调用形式

在讲解具体的实现之前,我们先看一下我们的testrpc是如何使用的。 server.go

package main

import (
	"log"
	"net"
	"testrpc"
)

type Args struct {
	A, B int
}

type Arith int

func (t *Arith) Multiply(args Args, reply *int) error {
	*reply = args.A * args.B
	return nil
}

func main() {
	// 创建一个rpc server对象
	newServer := testrpc.NewServer()

	// 向rpc server对象注册一个Arith对象,注册后,client就可以调用Arith的Multiply方法
	arith := new(Arith)
	newServer.Register(arith)

	// 监听本机的1234端口
	l, e := net.Listen("tcp", "127.0.0.1:1234")
	if e != nil {
		log.Fatalf("net.Listen tcp :0: %v", e)
	}

	for {
		// 阻塞直到从1234端口收到一个网络连接
		conn, e := l.Accept()
		if e != nil {
			log.Fatalf("l.Accept: %v", e)
		}

		//开始工作
		go newServer.ServeConn(conn)
	}
}

代码比较简单,也有注释,这里简单说明一下流程:我们先是new出来了一个rpc server对象,然后向这个server注册了Arith对象,注册后,client就可调用Arith暴露出来的所有方法,这里只有Multiply。然后我们监听了本机的1234端口,并且在for循环中等待来自1234端口的连接,等来了一个连接我们就调用ServeConn方法在一个新的goroutine中处理这个连接,然后我们继续等待新的连接,如此反复。

client.go

package main

import (
	"log"
	"net"
	"os"
	"testrpc"
)

type Args struct {
	A, B int
}

func main() {

	// 连接本机的1234端口,返回一个net.Conn对象
	conn, err := net.Dial("tcp", "127.0.0.1:1234")
	if err != nil {
		log.Println(err.Error())
		os.Exit(-1)
	}

	// main函数退出时关闭该网络连接
	defer conn.Close()

	// 创建一个rpc client对象
	client := testrpc.NewClient(conn)
	// main函数退出时关闭该client
	defer client.Close()

	// 调用远端Arith.Multiply函数
	args := Args{7, 8}
	var reply int
	err = client.Call("Arith.Multiply", args, &reply)
	if err != nil {
		log.Fatal("arith error:", err)
	}
	log.Println(reply)
}

我们先连接本机的1234端口(这个端口server.go在监听),得到一个net.Conn对象,然后用这个对象new出来了一个rpc client,然后再通过这个client调用服务端提供的方法Multiply,计算完后把结果存到reply中。代码很简单,就不多说了。

这个用法参考了golang官方的net/rpc库,有兴趣的读者也可以去学习一下net/rpc的使用方法。

实现原理

Server的定义

前面说过,server端要解决的问题有服务的注册,也就是Register方法,那么server端必须要能够存储这些服务,所以server的定义可以如下:

type Service struct {
	Method    reflect.Method
	ArgType   reflect.Type
	ReplyType reflect.Type
}

type Server struct {
	ServiceMap  map[string]map[string]*Service
	serviceLock sync.Mutex
}

一个Service对象就对应一个服务,一个服务包括方法、参数类型和返回值类型。Server有两个属性:ServiceMap和serviceLock,ServiceMap是一系列service的集合,之所以要以Map的形式是为了方便查找,serviceLock是为了保护ServiceMap,确保同一时刻只有一个goroutine能够写ServiceMap。

服务的注册

func (server *Server) Register(obj interface{}) error {
	server.serviceLock.Lock()
	defer server.serviceLock.Unlock()

	//通过obj得到其各个方法,存储在servicesMap中
	tp := reflect.TypeOf(obj)
	val := reflect.ValueOf(obj)
	serviceName := reflect.Indirect(val).Type().Name()
	if _, ok := server.ServiceMap[serviceName]; ok {
		return errors.New(serviceName + " already registed.")
	}

	s := make(map[string]*Service)
	numMethod := tp.NumMethod()
	for m := 0; m < numMethod; m++ {
		service := new(Service)
		method := tp.Method(m)
		mtype := method.Type
		mname := method.Name

		service.ArgType = mtype.In(1)
		service.ReplyType = mtype.In(2)
		service.Method = method
		s[mname] = service
	}
	server.ServiceMap[serviceName] = s
	server.ServerType = reflect.TypeOf(obj)
	return nil
}

这里把前面的调用Register的代码放出来一起看可能会更清楚一些。

type Arith int

func (t *Arith) Multiply(args Args, reply *int) error {
	*reply = args.A * args.B
	return nil
}

...
newServer := testrpc.NewServer()
newServer.Register(new(Arith))
...

Register的大概逻辑就是拿到obj(Register的参数)的各个暴露出来的方法(这里只有一个Multiply),然后存到server的ServiceMap中。这里主要用到了golang的reflect,如果对reflect不了解的话看Register代码还是比较吃力的。网上有很多讲解reflect的文章,建议不了解reflect的读者先去看看,这里就不讲了。注册之后,ServiceMap大概是这个样子

{
	"Arith": {"Multiply":&{Method:Multiply, ArgType:main.Args, ReplyType:*int}}	
}

网络传输

testrpc的网络传输是基于golang提供的net.Conn,这个net.Conn提供了两个方法:Read和Write。Read表示从网络连接中读取数据,Write表示向网络连接中写数据。我们就基于这两个方法来实现我们的网络传输,代码如下:

const (
	EachReadBytes = 500
)

type Transfer struct {
	conn net.Conn
}

func NewTransfer(conn net.Conn) *Transfer {
	return &Transfer{conn: conn}
}

func (trans *Transfer) ReadData() ([]byte, error) {
	finalData := make([]byte, 0)
	for {
		data := make([]byte, EachReadBytes)
		i, err := trans.conn.Read(data)
		if err != nil {
			return nil, err
		}
		finalData = append(finalData, data[:i]...)
		if i < EachReadBytes {
			break
		}
	}
	return finalData, nil
}

func (trans *Transfer) WriteData(data []byte) (int, error) {
	num, err := trans.conn.Write(data)
	return num, err
}

ReadData是从网络连接中读取数据,每次读500字节(由EachReadBytes指定),直到读完为止,然后把读到的数据返回;WriteData是向网络连接中写数据

序列化与反序列化

上节讲到了网络传输,我们知道,传输的对象是字节流。序列化就是负责把对象变成字节流的,相反的,反序列化就是负责将字节流变成程序中的对象的。在网络传输之前我们要先进行序列化,testrpc采用了json做为序列化方式,未来可能会加入其它的序列化方式,如gob、xml、protobuf等。我们先来看下采用json做为序列化的代码:

type EdCode int

func (edcode EdCode) encode(v interface{}) ([]byte, error) {
	return json.Marshal(v)
}

func (edcode EdCode) decode(data []byte, v interface{}) error {
	return json.Unmarshal(data, v)
}

这里采用了golang官方提供的json库,代码很简单,就不解释了。

Server端处理网络连接

  1. 构造一个Transfer对象,代表一个网络传输
  2. 调用Transfer的ReadData方法从网络连接中读数据
  3. 调用EdCode的decode方法将数据反序列化成普通对象
  4. 获取反序列化后数据,如方法名、参数
  5. 根据方法名查找ServiceMap,拿到对应的service
  6. 调用对应的service,得到结果
  7. 将结果序列化
  8. 使用Transfer的WriteData方法的写回到网络连接中

以上就是Server端在拿到Client端请求后的处理过程,我们把它封装在了ServeConn方法里。代码比较长,就不在这里贴了,有兴趣的可以去github里面看。

Client端发起网络连接

  1. 构造一个Transfer对象,代表一个网络传输
  2. 将要调用的服务名和参数序列化
  3. 使用Transfer的WriteData方法将序列化后的数据的写入到网络连接中
  4. 阻塞直到Server端计算完
  5. 调用Transfer的ReadData方法从网络连接中读取计算后的结果
  6. 将结果反序列化后返回给client 代码如下:
func (client *Client) Call(methodName string, req interface{}, reply interface{}) error {

	// 构造一个Request
	request := NewRequest(methodName, req)

	// encode
	var edcode EdCode
	data, err := edcode.encode(request)
	if err != nil {
		return err
	}

	// write
	// 构造一个Transfer
	trans := NewTransfer(client.conn)
	_, err = trans.WriteData(data)
	if err != nil {
		log.Println(err.Error())
		return err
	}

	// read
	data2, err := trans.ReadData()
	if err != nil {
		log.Println(err.Error())
		return err
	}

	// decode and assin to reply
	edcode.decode(data2, reply)

	// return
	return nil
}

Client的定义很简单,就一个代表网络连接的conn,代码如下:

type Client struct {
	conn net.Conn
}

为什么不用Python来实现

由于我平常写Python代码写得比较多,如果用Python来实现这个testrpc的话确实要更快。那么为什么要使用golang呢?因为我们公司最近做的一个项目是基于golang的net/rpc来实现的,工作之便,了解了net/rpc的使用,也稍微看了下net/rpc的底层代码,猜出了其大概原理,于是就想自己也写一个,就这样。当然,以后有机会的话再用Python实现一遍。

总结

本文主要讲述了RPC的原理,以及实现了一个轻量级的RPC。这里说一下我在调试的时候遇到的问题,由于golang是强类型语言,那么我必须要面对的问题是,怎么在一个对象经过序列化和反序列化之后依然保持其原来的类型。比如一个Args类型的对象(读者可以翻到最前面看看Args是如何定义的)在序列化和反序列化之后变成了map[string]interface{}类型,用map[string]interface{}类型当做Args类型传参是会报错的,这部分感兴趣的读者可以去看看代码。读者可能看过这篇文章没什么感觉,看过一遍也就看过了,感觉有收获又感觉没收获,我建议把代码下载下来,然后在自己的电脑上跑一下,调试一下,我相信你会有很深的理解,不管是对RPC,还是对golang的reflect等。比如说我,我以前对reflect掌握的不是很好,写完这个testrpc之后基本掌握了reflect的使用。

代码在github上,欢迎给个star和提交issue,也欢迎你提出自己的看法,比如这段代码写得不好,可以这么优化等等,都可以与我交流,多谢!

github地址:https://github.com/TanLian/testrpc

另也可关注下我个人的技术公众号,期待一起进步。