在 Node.js 中提供 gRPC 服务

3,802 阅读4分钟

引子

Web APP 的大行其道导致了 API 架构的流行,大量站点及服务使用基于 HTTP 1.1 的API 进行交互。这一类文本传输型 API 的优点很突出:易于编写和理解;支持异构平台的沟通。缺点也很明显:基于文本从而导致API传输内容过于庞大;存在客户端易感知的延迟。

如果对性能有所要求,不妨试试基于二进制传输的RPC框架,比如:

gRPC

gRPC 是一个高性能、开源的、通用的、面向移动端的 RPC 框架,传输协议基于 HTTP/2,这意味着它支持 双向流、流控、头部压缩、单 TCP 连接上的请求多路复用 等特性。

接口层面,gRPC默认使用 Protocol Buffers (简称 protobuf)做为其接口定义语言(IDL)来描述其服务接口和负载消息的格式。

grpc-accross-lang-and-platforms

gRPC目前提供的语言支持有:C++, Node.js, Python, Ruby, Objective-C, PHP, C#。

与 Node.js 集成

接口定义

protobuf 做为IDL的特点是语义良好、数据类型定义完备,当前语言版本分为 proto2proto3 。

protobuf 的接口定义大致分为几个部分:

  1. IDL版本 proto2/3
  2. 包名字
  3. 服务定义 和 方法定义
  4. 消息定义:请求消息和响应消息

此处,我们用 proto3 定义一个 testPackage 包中的 testService 服务,它仅提供一个 ping 方法,返回结果放在 message 字段中:

// test.proto

syntax = "proto3";

package testPackage;

service testService {
  rpc ping (pingRequest) returns (pingReply) {}
}

message pingRequest {
}

message pingReply {
  string message = 1;
}

Demo 版本

服务端

以 Node.JS 为例,我们使用 grpc 包,以动态加载 .proto 方式来提供 RPC 服务:

JavaScript
import grpc from 'grpc'

const PROTO_PATH = __dirname + '../protos/test.proto'
const testProto = grpc.load(PROTO_PATH).testPackage

function test(call, callback) {
  callback(null, {message: 'Pong'})
}

const server = new grpc.Server();
server.addProtoService(testProto.testService.service, {test: test})
server.bind('0.0.0.0:50051', grpc.ServerCredentials.createInsecure())
server.start()

需要注意的是:此处的代码是在内网测试,使用的是 ServerCredentials.createInsecure() 创建的非安全连接。如果是在公网提供 RPC 服务,请参考鉴权手册选择适合的方案: www.grpc.io/docs/guides…

客户端

客户端亦使用动态加载 .proto 接口定义的方式来进行服务调用(暗含的意思是,这个 .proto 文件需要服务端和客户端同时拥有,并且版本要一致,如果接口有升级,要确保双方拥有的  .proto 版本能同步更新):

JavaScript
import grpc from 'grpc'

const PROTO_PATH = __dirname + '../protos/test.proto'
const testProto = grpc.load(PROTO_PATH).testPackage

const client = new testProto.testService('0.0.0.0:50051',
                                       grpc.credentials.createInsecure());

client.ping({}, function(err, response) {
    console.log('ping -> :', response.message);
});

 

优化版本

上边的Demo版本仅是个可运行的玩具,在工程实践中,我们会有一些额外的需求:

  • .proto 文件置于指定文件夹,自动加载
  • 包和服务名称不需要硬编码,全由调用端动态指定
  • 可同时暴露/调用 多个包的多个RPC端点
  • ……

基于此,有了优化后的动态版本:

服务端

rpcServer.js:

JavaScript
import grpc from 'grpc'

class RpcServer {
  constructor(ip, port) {
    this.ip = ip
    this.port = port
    this.services = {}
    this.functions = {}
  }

  // 自动加载proto并且运行Server
  autoRun(protoDir) {
    fs.readdir(protoDir, (err, files) => {
      if (err) {
        return logger.error(err)
      }
      R.forEach((file) => {
        const filePart = path.parse(file)
        const serviceName = filePart.name
        const packageName = filePart.name
        const extName = filePart.ext
        const filePath = path.join(protoDir, file)

        if (extName === '.js') {
          const functions = require(filePath).default
          this.functions[serviceName] = Object.assign({}, functions)
        } else if (extName === '.proto') {
          this.services[serviceName] = 
            grpc.load(filePath)[packageName][serviceName].service
        }
      }, files)

      return this.runServer()
    })
  }

  runServer() {
    const server = new grpc.Server()

    R.forEach((serviceName) => {
      const service = this.services[serviceName]
      server.addProtoService(service, this.functions[serviceName])
    }, R.keys(this.services))

    server.bind(`${this.ip}:${this.port}`, 
                grpc.ServerCredentials.createInsecure())
    server.start()
  }
}

export default RpcServer

server.js 这么用它:

JavaScript
logger.info('Starting RPC Server:')
const rpcServer = new RpcServer('0.0.0.0', 50051)
rpcServer.autoRun(path.join(__dirname, '../protos/'))
客户端

rcpClient.js:

JavaScript
import grpc from 'grpc'

class RpcClient {
  constructor(ip, port) {
    this.ip = ip
    this.port = port
    this.services = {}
    this.clients = {}
  }

  // 自动加载proto并且connect
  autoRun(protoDir) {
    fs.readdir(protoDir, (err, files) => {
      if (err) {
        return logger.error(err)
      }
    return files.forEach((file) => {
        const filePart = path.parse(file)
        const serviceName = filePart.name
        const packageName = filePart.name
        const extName = filePart.ext
        const filePath = path.join(protoDir, file)

        if (extName === '.proto') {
          const proto = grpc.load(filePath)
          const Service = proto[packageName][serviceName]
          this.services[serviceName] = Service
          this.clients[serviceName] = new Service(`${this.ip}:${this.port}`,
            grpc.credentials.createInsecure())
        }
      }, files)
    })
  }

  async invoke(serviceName, name, params) {
    return new Promise((resolve, reject) => {
      function callback(error, response) {
        if (error) {
          reject(error)
        } else {
          resolve(response)
        }
      }

      params = params || {}
      if (this.clients[serviceName]
        && this.clients[serviceName][name]) {
        this.clients[serviceName][name](params, callback)
      } else {
        const error = new Error(
          `RPC endpoint: "${serviceName}.${name}" does not exists.`)
        reject(error)
      }
    })
  }

export default RpcClient

业务调用示例:

JavaScript
logger.info('RPC Client connecting:')
const rpcClient = new RpcClient(config.grpc.ip, config.grpc.port)
rpcClient.autoRun(path.join(__dirname, '../protos/'))

try {
    // expected: Pong
    const result = await rpcClient.invoke('testService', 'ping')
} catch (err) {
    logger.error(err)
}

单元测试

JavaScript
import { RpcClient } from '../components'

describe('one million RPCs', () => {
  before(() => {
    logger.info('RPC Client connecting:')
    global.rpcClient = new RpcClient(config.grpc.ip, config.grpc.port)
    rpcClient.autoRun(path.join(__dirname, '../protos'))
  })

  it('should not failed', async(done) => {
    const startTime = Date.now()
    const times = 1000000

    for(let i = 0; i < times; i++) {
      console.log(i)
      const respone = await rpcClient.invoke('testService', 'ping')
      respone.message.should.be.equal('Pong')
    }

    const total = Date.now() - startTime
    print('total(MS):', total)
    done()
  })
})

测试结果为单请求毫秒级响应:

Shell
999992
999993
999994
999995
999996
999997
999998
999999
total(MS): 1084334
    ✓ should not failed (1084338ms)


  1 passing (18m)

当然,这种顺序执行的RPC测试用例并不能准确地反映并发性能,仅供参考。

打赏作者 您的支持将激励我继续创作! 去打赏 支持: 微信支付 支付宝

您的支持将鼓励我们继续创作!

微信支付 支付宝

用 [微信] 扫描二维码打赏

用 [支付宝] 扫描二维码打赏