RPC
什么是
RPC
?
RPC
全称为Remote Procedure Call
翻译过来就是远程过程调用
RPC
和HTTP
的区别
HTTP
是一种协议,RPC
可以通过HTTP
来实现,也可以通过Socket
自己实现一套协议来实现。
论复杂度,RPC
框架肯定是高于简单的HTTP
接口的。但毋庸置疑,HTTP
接口由于受限于HTTP
协议,需要带HTTP
请求头,导致传输起来效率或者说安全性不如RPC
并且要否认一点,HTTP
协议相对于TCP
报文协议,增加的开销在于连接与断开。HTTP
是支持连接池复用的(HTTP 1.x)
Thrift架构
Apache Thrift
是一个跨语言的服务框架,本质上为RPC
,同时具有序列化、反序列化机制Thrift
包含一个完整的堆栈结构用于构建客户端和服务器端。
传输协议(TProtocol
)
Thrift
可以让用户选择客户端和服务端之间传输通信协议的区别,在传输协议上总体分为文本和二进制(binary
)传输协议,为了节省带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数。
TBinaryProtocol
:二进制编码格式进行数据传输TCompactProtocol
:高效率的、密集的二进制编码格式进行数据传输TJSONProtocol
:使用JSON
的数据编码协议进行数据传输TDebugProtocol
:使用易懂的可读文本格式,以便于debug
数据传输方式(TTransport
)
TTransport
是与底层数据传输紧密相关的传输层。每一种支持的底层传输方式都存在一个与之对应的TTransport
。在这一层,数据是按字节流处理的,即传输层看到的是一个又一个的字节,并把这些字节按顺序发送和接收。TTransport
并不了解它所传输的数据是什么类型,实际上传输层也不关心数据是什么类型,只需要按照字节方式对数据进行发送和接收即可。数据类型的解析在TProtocol
这一层完成。
TSocket
:使用阻塞式I/O
进行传输,是最常见的模式THttpTransport
:采用HTTP
协议进行数据传输TFramedTransPort
: 以frame
为单位进行传输,非阻塞式服务中使用;TFileTransPort
:以文件形式进行传输TMemoryTransport
:将内存用于I/O
传输TZlibTransport
:使用zlib
进行压缩, 与其他传输方式联合使用TBufferedTransport
对某个transport
对象操作的数据进行buffer
,即从buffer
中读取数据进行传输,或将数据直接写入到buffer
服务端网络模型(TServer
)
TServer
在thrift
框架中的主要任务是接收client
请求,并转发到某个processor
上进行请求处理。针对不同的访问规模,thrift
提供了不同TServer
模型。thrift
目前支持的server
模型包括:
TSimpleServer
: 单线程服务器端使用标准的阻塞式I/O
TTHreaadPoolServer
: 多线程服务器端使用标准的阻塞式I/O
TNonblockingServer
:多线程服务器端使用非阻塞式I/O
TThreadedServer
:多线程网络模型,使用阻塞式I/O
,为每个请求创建一个线程
对于`golang`来说,只有`TSimpleServer`服务模式,并且是非阻塞的
TProcesser
TProcessor
主要对TServer
中一次请求的inputProtocol
和outputProtocol
进行操作,也就是从inputProtocol
中读出client
的请求数据,向outputProtocol
写入用户逻辑的返回值。TProcessorprocess
是一个非常关键的处理函数,因为client
所有的rpc
调用都会经过该函数处理并转发
ThriftClient
ThriftClient
跟TProcessor
一样主要操作inputProtocol
和outputProtocol
,不同的是thriftClient
将rpc
调用分为send
和receive
两个步骤:
send
步骤,将用户的调用参数作为一个整体的struct
写入TProtocol
,并发送到TServer
。send
结束后,thriftClient
便立即进入receive
状态等待TServer
的响应。对于TServer
的响应,使用返回值解析类进行返回值解析,完成rpc
调用。
TSimpleServer服务模式
实际上这不是典型的TSimpleServer
,因为它在接受套接字后没有被阻塞。
它更像是一个TThreadedServer
,可以处理不同goroutine
中的不同连接。
如果golang
用户在客户端实现conn-pool
之类的东西这将有效。
type TSimpleServer struct {
quit chan struct{} // 采用阻塞channel进行判断
processorFactory TProcessorFactory
serverTransport TServerTransport
inputTransportFactory TTransportFactory
outputTransportFactory TTransportFactory
inputProtocolFactory TProtocolFactory
outputProtocolFactory TProtocolFactory
}
以下代码thrift-idl
为,接下来的解析以此为例
namespace go echo
struct EchoReq {
1: string msg;
}
struct EchoRes {
1: string msg;
}
service Echo {
EchoRes echo(1: EchoReq req);
}
服务端Server代码
func (p *TSimpleServer) Serve() error {
err := p.Listen()
if err != nil {
return err
}
p.AcceptLoop()
return nil
}
func (p *TSimpleServer) AcceptLoop() error {
for {
// 此处的Accept()是阻塞的,是调用listener.Accept()
client, err := p.serverTransport.Accept()
if err != nil {
select {
case <-p.quit:
return nil
default:
}
return err
}
if client != nil {
go func() {
if err := p.processRequests(client); err != nil {
log.Println("error processing request:", err)
}
}()
}
}
}
如果server
此时还在处理请求,服务端突然重启,thrift 1.0
是无法做到优雅重启的,但是go thrift
的最新版本采用了golang waitgroup
的方式实现了优雅重启~
func (p *TSimpleServer) processRequests(client TTransport) error {
processor := p.processorFactory.GetProcessor(client)
inputTransport := p.inputTransportFactory.GetTransport(client)
outputTransport := p.outputTransportFactory.GetTransport(client)
inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
defer func() {
if e := recover(); e != nil {
log.Printf("panic in processor: %s: %s", e, debug.Stack())
}
}()
if inputTransport != nil {
defer inputTransport.Close()
}
if outputTransport != nil {
defer outputTransport.Close()
}
for {
ok, err := processor.Process(inputProtocol, outputProtocol)
if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
return nil
} else if err != nil {
log.Printf("error processing request: %s", err)
return err
}
if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
continue
}
if !ok {
break
}
}
return nil
}
Process
处理逻辑
func (p *EchoProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
name, _, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return false, err
}
// 获取传递过来的name,如果存在则处理
if processor, ok := p.GetProcessorFunction(name); ok {
return processor.Process(seqId, iprot, oprot)
}
// 异常逻辑
iprot.Skip(thrift.STRUCT)
iprot.ReadMessageEnd()
x3 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
x3.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, x3
}
TServer
接收到rpc
请求之后,调用TProcessorprocess
进行处理。
TProcessorprocess
首先调用TTransport.readMessageBegin
接口,读出rpc
调用的名称和rpc
调用类型。
如果rpc
调用类型是rpc call
,则调用TProcessor.process_fn
继续处理,对于未知的rpc
调用类型,则抛出异常。
TProcessor.process_fn
根据rpc
调用名称,到自己的processMap
中查找对应的rpc
处理函数。如果存在对应的rpc
处理函数,则调用该处理函数继续进行请求响应。不存在则抛出异常。
func (p *echoProcessorEcho) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
args := EchoEchoArgs{}
// 读取入参的参数
if err = args.Read(iprot); err != nil {
iprot.ReadMessageEnd()
x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, err
}
iprot.ReadMessageEnd()
result := EchoEchoResult{}
var retval *EchoRes
var err2 error
// 此处是thrift为什么err不能传错误,如果传业务错误会被阻塞
if retval, err2 = p.handler.Echo(args.Req); err2 != nil {
x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing echo: "+err2.Error())
oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return true, err2
} else {
result.Success = retval
}
if err2 = oprot.WriteMessageBegin("echo", thrift.REPLY, seqId); err2 != nil {
err = err2
}
if err2 = result.Write(oprot); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.Flush(); err == nil && err2 != nil {
err = err2
}
if err != nil {
return
}
return true, err
}
服务端stop
代码
var once sync.Once
func (p *TSimpleServer) Stop() error {
q := func() {
p.quit <- struct{}{}
p.serverTransport.Interrupt()
}
once.Do(q)
return nil
}
stop
函数比较简单,可以看出直接向阻塞队列里面写入数据,然后server
不再接受请求
客户端代码
Client
调用的函数
func (p *EchoClient) Echo(req *EchoReq) (r *EchoRes, err error) {
if err = p.sendEcho(req); err != nil {
return
}
return p.recvEcho()
}
sendEcho()
函数
func (p *EchoClient) sendEcho(req *EchoReq) (err error) {
oprot := p.OutputProtocol
if oprot == nil {
oprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.OutputProtocol = oprot
}
// seqid + 1
p.SeqId++
if err = oprot.WriteMessageBegin("echo", thrift.CALL, p.SeqId); err != nil {
return
}
// 构建参数
args := EchoEchoArgs{
Req: req,
}
if err = args.Write(oprot); err != nil {
return
}
// 通知服务器发送完毕
if err = oprot.WriteMessageEnd(); err != nil {
return
}
return oprot.Flush()
}
recvEcho()
函数
func (p *EchoClient) recvEcho() (value *EchoRes, err error) {
iprot := p.InputProtocol
if iprot == nil {
iprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.InputProtocol = iprot
}
//
method, mTypeId, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return
}
if method != "echo" {
err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "echo failed: wrong method name")
return
}
if p.SeqId != seqId {
err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "echo failed: out of sequence response")
return
}
if mTypeId == thrift.EXCEPTION {
error0 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
var error1 error
error1, err = error0.Read(iprot)
if err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
err = error1
return
}
if mTypeId != thrift.REPLY {
err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "echo failed: invalid message type")
return
}
result := EchoEchoResult{}
if err = result.Read(iprot); err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
value = result.GetSuccess()
return
}
thrift在mac机器安装问题
- 问题1:
go get git.apache.org/thrift.git/lib/go/thrift
失败 - 问题2:直接使用
github.com
提供的版本会报未知错误
问题2需要根据你的thrift -version
来判断下载哪一个版本的thrift
,比如我的thrift版本是0.10.0
那么需要下载的thrift
地址为https://github.com/apache/thrift/archive/0.10.0.zip
手动创建mkdir -p git.apache.org/thrift.git/lib/go/
目录,然后将下载后的go
文件移至该目录下~