阅读 57

rpc之thrift入门与TBinaryProtocol源码追踪

thrift是一个支持多语言进行RPC的软件库,开发者通过定义数据类型和服务接口,经由thrift的代码生成引擎就可以构建RPC客户端和服务端所需要的代码。它的核心组件如下:

  • Types。thrift支持的数据类型,由thrift自身定义,定义的类型是所有编程语言中都会用到的关键类型

  • Transport。做数据传输用,抽象了对网络的一些操作,比如 read/write

  • Protocol。数据经由client和server进行编码或者解码的格式,比如 JSON、binary

  • Versioning。client和server对于接口变动的处理方式,比如server在旧接口上新增了一个字段,但是client还是旧的

  • Processors。服务端收到RPC后的处理逻辑,负责将读到的内容交由server处理,并将结果写回输出流

Type

  1. 支持的基本类型为

    • bool 取值为true/false

    • byte 有符号的1个字节

    • i16 16比特的有符号integer

    • i32 32比特的有符号integer

    • i64 64比特的有符号integer

    • double 64比特的浮点数

    • string 文本字符串

  2. 复杂类型,比如java中的自定义对象,可以通过 struct 来组织

  3. 集合目前支持 list/set/map,可以对应成java的ArrayList/HashSet/HashMap

  4. 异常通过 exception 来标识,类似 struct

  5. 接口定义使用 service 来标识

  6. 枚举使用 enum 来标识

定义实例请戳这里

Thrift接口描述语言(IDL)

Thrift IDL文件是开发者自定义逻辑内容的地方,它会被Thrift 的代码生成器处理成目标语言的代码

以下以java来类比

  1. namespace java paxi.maokitty.verify.service namespace 相当于java中声明了当前文件的包名,java则是表示namespace的适用语言

  2. include 'myException.thrift' include相当于import,引入别的包,以便使用它的内容

  3. typedef myException.myException myException 定义别名,方便写

  4. 1:i32 code 定义字段 code,类型是i32,序号是1

  5. string say(1:string msg)throws(1:myException e) 定义方法say,返回值是string,参数是string,抛出自定义异常 myException(这里是别名)

Transport

  • TTransport 提供了一些网络方法的抽象 open/close/read/write/flush ;

  • TServerTransport 则是负责接受创建连接 open/listen/accept/close

  • TSocket 对TTransport的集成实现,提供sokcet

Protocol

protocol提供了一种将内存数据映射到一种特定结构的机制,thrift的设计是要支持流式协议

  • 数据整个被编码成一系列的字段,每个字段都有一个类型和一个唯一的识别码(序号)

  • structs的结束则会自带一个 STOP 的标记

这种设计方式使得thrift协议能够自我界定,而不需要关心具体的编码方式

当然如果固定格式是一种优势或者流处理不是必须的,可以使用TFrameTransport来支持

Versioning

Versioning在thrift中的实现是通过字段识别码完成的,每个声明的字段前面都会有一个唯一的字段识别码

如果没有人为的添加,thrift自身会从-1开始递减,自动加上

当Server读到数据之后,会根据字段识别码来识别是否有根据定义文件存放了这个字段,如果没有,就会跳过无法识别字段的长度,而不用抛出异常。如果这个字段是有的,而且必须要的,则在真的使用这个字段前,先通过isset检查是否真的存在

新老版本可能存在的情况分析

  1. 添加了新的字段,老的客户端,新的server。此时老的客户端不会发送新的字段,新的server发现字段没有set,就按照过期请求的默认行为处理

  2. 删除了字段,老的客户端,新的server。老的客户端会发送已经删掉的字段,新的server会直接无视

  3. 添加了新的字段,新的客户端,老的server。新客户端会发送新的字段,老的server则是直接无视这个字段,按现有逻辑处理

  4. 删了字段,新的客户端,老的server。建议不要出现这种情况

Processor

对每个service,都会对应的生成一个Processor,它主要是负责从网络得到的数据中提取出内容,然后代理请求真正的方法,再把结果写入输出的Prococol

thrift命令使用


thrift -out ../src/main/java --gen  java:private-members ./myException.thrift

复制代码

-out 指定输出的目录

--gen java:private-members ./myException.thrift 表示目标语言是java,字段使用private

thrift代码

根据thrift的IDL语法,写下自己要实现的函数功能


service DemoService{string say(1:string msg)throws(1:myException e),}

复制代码

完整实例戳这里

以使用TBinaryProtocol为例,指定好端口、创建Transport、发起请求的客户端

  //1:网路请求相关设置
    transport=new TSocket("127.0.0.1",9000,1000);
    //2:传输数据的编码方式
    TProtocol protocol=new TBinaryProtocol(transport);
    //3:建立连接
    transport.open();
    //4:创建客户端
    DemoService.Client client=new DemoService.Client(protocol);
    //5:发起请求
    String say = client.say("i am client");
复制代码

在服务端则对应设置接收请求的端口,然后等待连接的到来

  //1:创建等待连接的serverSocket
    TServerSocket serverSocket=new TServerSocket(9000);
    //2:构建server所需要的参数
    TServer.Args serverArgs=new TServer.Args(serverSocket);
    //3:逻辑处理
    TProcessor processor=new DemoService.Processor<DemoService.Iface>(new DemoServiceImpl());
    //4:解析协议
    serverArgs.protocolFactory(new TBinaryProtocol.Factory());
    serverArgs.processor(processor);
    //5:组织组件完成功能
    TServer server=new TSimpleServer(serverArgs);
    LOG.info("main server start ... ");
    //6:等待连接到来
    server.serve();
复制代码

可运行的客户端和服务端案例请戳这里

TBinaryProtocol源码追踪

服务端启动后,等待连接的到来

@Trace(
        index = 9,
        originClassName = "org.apache.thrift.server.TSimpleServer",
        function = "public void serve() "
)
public void serve(){
   //...
    Code.SLICE.source("client = serverTransport_.accept();")
            .interpretation("底层就是ServerSocket的accept函数,它将返回的结果封装成TSocket返回");
    //..
    Code.SLICE.source(" processor = processorFactory_.getProcessor(client);\n" +
            "          inputTransport = inputTransportFactory_.getTransport(client);\n" +
            "          outputTransport = outputTransportFactory_.getTransport(client);\n" +
            "          inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);\n" +
            "          outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);\n" +
            "          while (processor.process(inputProtocol, outputProtocol)) {}")
            .interpretation("processor即thrift根据用户写的代码实现类的processor,其余四个参数则是得到的请求中获取的协议处理器,用来读取数据和返回数据,拿到后交由处理器处理");
    }
复制代码

客户端则是在主动发起请求的时候,按照TBinaryProtocol的协议格式写入数据

Code.SLICE.source("oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));")
        .interpretation("opprot即初始化 DemoService.client 时传入的 TBinaryProtocol ,seqid默认值为0")
        .interpretation("Begin部分的写入首先是按照字节数写入版本、然后是方法名的长度,再是方法名,最后写入序列号,按照特定的规则写入数据");
Code.SLICE.source("args.write(oprot_);")
        .interpretation("负责将参数写入Buffer,它会按照参数的顺序写入,每个参数又是按照类型、序号、值的顺序写入");
//..
Code.SLICE.source("oprot_.getTransport().flush();")
        .interpretation("数据已经写入了缓冲区,把没有写完的数据写入对应的文件描述符");

复制代码

当收到请求后服务端则交由服务端的实现具体的处理逻辑然后再回写内容


Code.SLICE.source("T args = getEmptyArgsInstance();")
     .interpretation("拿到参数的类型,这里就是 say_args");
//..
Code.SLICE.source("args.read(iprot);")
        .interpretation("从say_args的scheme(say_argsStandardScheme)中读取参数");
//..
Code.SLICE.source("TBase result = getResult(iface, args);")
        .interpretation("调用实现类,去执行用户自己写的逻辑,并得到对应的结果");
//...
Code.SLICE.source("oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.REPLY, seqid));" +
        " result.write(oprot);\n" +
        "    oprot.writeMessageEnd();\n" +
        "    oprot.getTransport().flush();")
        .interpretation("开始往返回Stream中写入数据,表明这是对那个方法的返回值,然后写入返回的结果,最后输入socket");

复制代码

源码追踪详细过程请戳这里

TBinaryProtocol源码总结

client会按照字节的写入规则严格的写入和读取。底层通信实际上就是socket,服务端接收到请求后,交由对应用户的实现接口来调用实现类,再将结果写入输出流, 客户端等结果返回后再按照规则读取结果,完成1次rpc的调用

附录

Thrift: Scalable Cross-Language Services Implementation

thrift的类型

IDL语言语法

关注下面的标签,发现更多相似文章
评论