阅读 401

thrift golang 解析

RPC

什么是RPC?

RPC全称为Remote Procedure Call 翻译过来就是远程过程调用

RPCHTTP的区别

HTTP是一种协议,RPC可以通过HTTP来实现,也可以通过Socket自己实现一套协议来实现。

论复杂度,RPC框架肯定是高于简单的HTTP接口的。但毋庸置疑,HTTP接口由于受限于HTTP协议,需要带HTTP请求头,导致传输起来效率或者说安全性不如RPC

并且要否认一点,HTTP协议相对于TCP报文协议,增加的开销在于连接与断开。HTTP是支持连接池复用的(HTTP 1.x)

Thrift架构

Apache Thrift是一个跨语言的服务框架,本质上为RPC,同时具有序列化、反序列化机制Thrift包含一个完整的堆栈结构用于构建客户端和服务器端。

传输协议(TProtocol)

Thrift可以让用户选择客户端和服务端之间传输通信协议的区别,在传输协议上总体分为文本和二进制(binary)传输协议,为了节省带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数。

  • TBinaryProtocol:二进制编码格式进行数据传输
  • TCompactProtocol:高效率的、密集的二进制编码格式进行数据传输
  • TJSONProtocol:使用 JSON 的数据编码协议进行数据传输
  • TDebugProtocol:使用易懂的可读文本格式,以便于debug

数据传输方式(TTransport)

TTransport是与底层数据传输紧密相关的传输层。每一种支持的底层传输方式都存在一个与之对应的TTransport。在这一层,数据是按字节流处理的,即传输层看到的是一个又一个的字节,并把这些字节按顺序发送和接收。TTransport并不了解它所传输的数据是什么类型,实际上传输层也不关心数据是什么类型,只需要按照字节方式对数据进行发送和接收即可。数据类型的解析在TProtocol这一层完成。

  • TSocket:使用阻塞式 I/O进行传输,是最常见的模式
  • THttpTransport:采用HTTP协议进行数据传输
  • TFramedTransPort: 以frame为单位进行传输,非阻塞式服务中使用;
  • TFileTransPort:以文件形式进行传输
  • TMemoryTransport:将内存用于I/O传输
  • TZlibTransport:使用zlib进行压缩, 与其他传输方式联合使用
  • TBufferedTransport对某个transport对象操作的数据进行buffer,即从buffer中读取数据进行传输,或将数据直接写入到buffer

服务端网络模型(TServer)

TServerthrift框架中的主要任务是接收client请求,并转发到某个processor上进行请求处理。针对不同的访问规模,thrift提供了不同TServer模型。thrift目前支持的server模型包括:

  • TSimpleServer: 单线程服务器端使用标准的阻塞式I/O
  • TTHreaadPoolServer: 多线程服务器端使用标准的阻塞式I/O
  • TNonblockingServer:多线程服务器端使用非阻塞式I/O
  • TThreadedServer:多线程网络模型,使用阻塞式I/O,为每个请求创建一个线程

对于`golang`来说,只有`TSimpleServer`服务模式,并且是非阻塞的

TProcesser

TProcessor主要对TServer中一次请求的inputProtocoloutputProtocol进行操作,也就是从inputProtocol中读出client的请求数据,向outputProtocol写入用户逻辑的返回值。TProcessorprocess是一个非常关键的处理函数,因为client所有的rpc调用都会经过该函数处理并转发

ThriftClient

ThriftClientTProcessor一样主要操作inputProtocoloutputProtocol,不同的是thriftClientrpc调用分为sendreceive两个步骤:

  • send步骤,将用户的调用参数作为一个整体的struct写入TProtocol,并发送到TServer
  • send结束后,thriftClient便立即进入receive状态等待TServer的响应。对于TServer的响应,使用返回值解析类进行返回值解析,完成rpc调用。

TSimpleServer服务模式

实际上这不是典型的TSimpleServer因为它在接受套接字后没有被阻塞。 它更像是一个TThreadedServer,可以处理不同goroutine中的不同连接。 如果golang用户在客户端实现conn-pool之类的东西这将有效。

type TSimpleServer struct {
	quit chan struct{}     // 采用阻塞channel进行判断

	processorFactory       TProcessorFactory
	serverTransport        TServerTransport
	inputTransportFactory  TTransportFactory
	outputTransportFactory TTransportFactory
	inputProtocolFactory   TProtocolFactory
	outputProtocolFactory  TProtocolFactory
}

复制代码

以下代码thrift-idl为,接下来的解析以此为例

namespace go echo

struct EchoReq {
    1: string msg;
}

struct EchoRes {
    1: string msg;
}

service Echo {
    EchoRes echo(1: EchoReq req);
}
复制代码

服务端Server代码

func (p *TSimpleServer) Serve() error {
	err := p.Listen()
	if err != nil {
		return err
	}
	p.AcceptLoop()
	return nil
}

func (p *TSimpleServer) AcceptLoop() error {
	for {
	    // 此处的Accept()是阻塞的,是调用listener.Accept()
		client, err := p.serverTransport.Accept()
		if err != nil {
			select {
			case <-p.quit:
				return nil
			default:
			}
			return err
		}
		if client != nil {
			go func() {
				if err := p.processRequests(client); err != nil {
					log.Println("error processing request:", err)
				}
			}()
		}
	}
}
复制代码

如果server此时还在处理请求,服务端突然重启,thrift 1.0是无法做到优雅重启的,但是go thrift的最新版本采用了golang waitgroup的方式实现了优雅重启~

func (p *TSimpleServer) processRequests(client TTransport) error {
	processor := p.processorFactory.GetProcessor(client)

	inputTransport := p.inputTransportFactory.GetTransport(client)
	outputTransport := p.outputTransportFactory.GetTransport(client)

	inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
	outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
	defer func() {
		if e := recover(); e != nil {
			log.Printf("panic in processor: %s: %s", e, debug.Stack())
		}
	}()
	if inputTransport != nil {
		defer inputTransport.Close()
	}
	if outputTransport != nil {
		defer outputTransport.Close()
	}
	for {
		ok, err := processor.Process(inputProtocol, outputProtocol)

		if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
			return nil
		} else if err != nil {
			log.Printf("error processing request: %s", err)
			return err
		}
		if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
			continue
		}
 		if !ok {
			break
		}
	}
	return nil
}
复制代码

Process处理逻辑

func (p *EchoProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
	name, _, seqId, err := iprot.ReadMessageBegin()
	if err != nil {
		return false, err
	}
	// 获取传递过来的name,如果存在则处理
	if processor, ok := p.GetProcessorFunction(name); ok {
		return processor.Process(seqId, iprot, oprot)
	}
	// 异常逻辑
	iprot.Skip(thrift.STRUCT)
	iprot.ReadMessageEnd()
	x3 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
	oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
	x3.Write(oprot)
	oprot.WriteMessageEnd()
	oprot.Flush()
	return false, x3

}
复制代码

TServer接收到rpc请求之后,调用TProcessorprocess进行处理。 TProcessorprocess首先调用TTransport.readMessageBegin接口,读出rpc调用的名称和rpc调用类型。

如果rpc调用类型是rpc call,则调用TProcessor.process_fn继续处理,对于未知的rpc调用类型,则抛出异常。 TProcessor.process_fn根据rpc调用名称,到自己的processMap中查找对应的rpc处理函数。如果存在对应的rpc处理函数,则调用该处理函数继续进行请求响应。不存在则抛出异常。

func (p *echoProcessorEcho) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
	args := EchoEchoArgs{}
	// 读取入参的参数
	if err = args.Read(iprot); err != nil {
		iprot.ReadMessageEnd()
		x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
		oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
		x.Write(oprot)
		oprot.WriteMessageEnd()
		oprot.Flush()
		return false, err
	}

	iprot.ReadMessageEnd()

	result := EchoEchoResult{}
	var retval *EchoRes
	var err2 error
	// 此处是thrift为什么err不能传错误,如果传业务错误会被阻塞
	if retval, err2 = p.handler.Echo(args.Req); err2 != nil {
		x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing echo: "+err2.Error())
		oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
		x.Write(oprot)
		oprot.WriteMessageEnd()
		oprot.Flush()
		return true, err2
	} else {
		result.Success = retval
	}

	if err2 = oprot.WriteMessageBegin("echo", thrift.REPLY, seqId); err2 != nil {
		err = err2
	}

	if err2 = result.Write(oprot); err == nil && err2 != nil {
		err = err2
	}
	if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
		err = err2
	}

	if err2 = oprot.Flush(); err == nil && err2 != nil {
		err = err2
	}
	if err != nil {
		return
	}
	return true, err
}
复制代码

服务端stop代码

var once sync.Once
func (p *TSimpleServer) Stop() error {
	q := func() {
		p.quit <- struct{}{}
		p.serverTransport.Interrupt()
	}
	once.Do(q)
	return nil
}
复制代码

stop函数比较简单,可以看出直接向阻塞队列里面写入数据,然后server不再接受请求

客户端代码

Client调用的函数

func (p *EchoClient) Echo(req *EchoReq) (r *EchoRes, err error) {

	if err = p.sendEcho(req); err != nil {
		return
	}

	return p.recvEcho()
}
复制代码

sendEcho()函数

func (p *EchoClient) sendEcho(req *EchoReq) (err error) {
	oprot := p.OutputProtocol
	if oprot == nil {
		oprot = p.ProtocolFactory.GetProtocol(p.Transport)
		p.OutputProtocol = oprot
	}
	// seqid + 1
	p.SeqId++

	if err = oprot.WriteMessageBegin("echo", thrift.CALL, p.SeqId); err != nil {
		return
	}

	// 构建参数
	args := EchoEchoArgs{
		Req: req,
	}

	if err = args.Write(oprot); err != nil {
		return
	}
	// 通知服务器发送完毕
	if err = oprot.WriteMessageEnd(); err != nil {
		return
	}
	return oprot.Flush()
}
复制代码

recvEcho()函数

func (p *EchoClient) recvEcho() (value *EchoRes, err error) {
	iprot := p.InputProtocol
	if iprot == nil {
		iprot = p.ProtocolFactory.GetProtocol(p.Transport)
		p.InputProtocol = iprot
	}
	//
	method, mTypeId, seqId, err := iprot.ReadMessageBegin()
	if err != nil {
		return
	}
	if method != "echo" {
		err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "echo failed: wrong method name")
		return
	}
	if p.SeqId != seqId {
		err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "echo failed: out of sequence response")
		return
	}
	if mTypeId == thrift.EXCEPTION {
		error0 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
		var error1 error
		error1, err = error0.Read(iprot)
		if err != nil {
			return
		}
		if err = iprot.ReadMessageEnd(); err != nil {
			return
		}
		err = error1
		return
	}
	if mTypeId != thrift.REPLY {
		err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "echo failed: invalid message type")
		return
	}
	result := EchoEchoResult{}
	if err = result.Read(iprot); err != nil {
		return
	}
	if err = iprot.ReadMessageEnd(); err != nil {
		return
	}
	value = result.GetSuccess()
	return
}
复制代码

thrift在mac机器安装问题

  • 问题1:go get git.apache.org/thrift.git/lib/go/thrift失败
  • 问题2:直接使用github.com提供的版本会报未知错误

问题2需要根据你的thrift -version来判断下载哪一个版本的thrift,比如我的thrift版本是0.10.0那么需要下载的thrift地址为https://github.com/apache/thrift/archive/0.10.0.zip

手动创建mkdir -p git.apache.org/thrift.git/lib/go/目录,然后将下载后的go文件移至该目录下~

Reference

关注下面的标签,发现更多相似文章
评论