引子
Web APP 的大行其道导致了 API 架构的流行,大量站点及服务使用基于 HTTP 1.1 的API 进行交互。这一类文本传输型 API 的优点很突出:易于编写和理解;支持异构平台的沟通。缺点也很明显:基于文本从而导致API传输内容过于庞大;存在客户端易感知的延迟。
如果对性能有所要求,不妨试试基于二进制传输的RPC框架,比如:
gRPC
gRPC 是一个高性能、开源的、通用的、面向移动端的 RPC 框架,传输协议基于 HTTP/2,这意味着它支持 双向流、流控、头部压缩、单 TCP 连接上的请求多路复用 等特性。
接口层面,gRPC默认使用 Protocol Buffers (简称 protobuf)做为其接口定义语言(IDL)来描述其服务接口和负载消息的格式。
gRPC目前提供的语言支持有:C++, Node.js, Python, Ruby, Objective-C, PHP, C#。
与 Node.js 集成
接口定义
protobuf 做为IDL的特点是语义良好、数据类型定义完备,当前语言版本分为 proto2 和 proto3 。
protobuf 的接口定义大致分为几个部分:
- IDL版本 proto2/3
- 包名字
- 服务定义 和 方法定义
- 消息定义:请求消息和响应消息
此处,我们用 proto3 定义一个 testPackage 包中的 testService 服务,它仅提供一个 ping 方法,返回结果放在 message 字段中:
// test.proto
syntax = "proto3";
package testPackage;
service testService {
rpc ping (pingRequest) returns (pingReply) {}
}
message pingRequest {
}
message pingReply {
string message = 1;
}
Demo 版本
服务端
以 Node.JS 为例,我们使用 grpc 包,以动态加载 .proto 方式来提供 RPC 服务:
JavaScriptimport grpc from 'grpc'
const PROTO_PATH = __dirname + '../protos/test.proto'
const testProto = grpc.load(PROTO_PATH).testPackage
function test(call, callback) {
callback(null, {message: 'Pong'})
}
const server = new grpc.Server();
server.addProtoService(testProto.testService.service, {test: test})
server.bind('0.0.0.0:50051', grpc.ServerCredentials.createInsecure())
server.start()
需要注意的是:此处的代码是在内网测试,使用的是 ServerCredentials.createInsecure()
创建的非安全连接。如果是在公网提供 RPC 服务,请参考鉴权手册选择适合的方案: www.grpc.io/docs/guides…。
客户端
客户端亦使用动态加载 .proto 接口定义的方式来进行服务调用(暗含的意思是,这个 .proto 文件需要服务端和客户端同时拥有,并且版本要一致,如果接口有升级,要确保双方拥有的 .proto 版本能同步更新):
JavaScriptimport grpc from 'grpc'
const PROTO_PATH = __dirname + '../protos/test.proto'
const testProto = grpc.load(PROTO_PATH).testPackage
const client = new testProto.testService('0.0.0.0:50051',
grpc.credentials.createInsecure());
client.ping({}, function(err, response) {
console.log('ping -> :', response.message);
});
优化版本
上边的Demo版本仅是个可运行的玩具,在工程实践中,我们会有一些额外的需求:
- .proto 文件置于指定文件夹,自动加载
- 包和服务名称不需要硬编码,全由调用端动态指定
- 可同时暴露/调用 多个包的多个RPC端点
- ……
基于此,有了优化后的动态版本:
服务端
rpcServer.js:
JavaScriptimport grpc from 'grpc'
class RpcServer {
constructor(ip, port) {
this.ip = ip
this.port = port
this.services = {}
this.functions = {}
}
// 自动加载proto并且运行Server
autoRun(protoDir) {
fs.readdir(protoDir, (err, files) => {
if (err) {
return logger.error(err)
}
R.forEach((file) => {
const filePart = path.parse(file)
const serviceName = filePart.name
const packageName = filePart.name
const extName = filePart.ext
const filePath = path.join(protoDir, file)
if (extName === '.js') {
const functions = require(filePath).default
this.functions[serviceName] = Object.assign({}, functions)
} else if (extName === '.proto') {
this.services[serviceName] =
grpc.load(filePath)[packageName][serviceName].service
}
}, files)
return this.runServer()
})
}
runServer() {
const server = new grpc.Server()
R.forEach((serviceName) => {
const service = this.services[serviceName]
server.addProtoService(service, this.functions[serviceName])
}, R.keys(this.services))
server.bind(`${this.ip}:${this.port}`,
grpc.ServerCredentials.createInsecure())
server.start()
}
}
export default RpcServer
server.js 这么用它:
JavaScriptlogger.info('Starting RPC Server:')
const rpcServer = new RpcServer('0.0.0.0', 50051)
rpcServer.autoRun(path.join(__dirname, '../protos/'))
客户端
rcpClient.js:
JavaScriptimport grpc from 'grpc'
class RpcClient {
constructor(ip, port) {
this.ip = ip
this.port = port
this.services = {}
this.clients = {}
}
// 自动加载proto并且connect
autoRun(protoDir) {
fs.readdir(protoDir, (err, files) => {
if (err) {
return logger.error(err)
}
return files.forEach((file) => {
const filePart = path.parse(file)
const serviceName = filePart.name
const packageName = filePart.name
const extName = filePart.ext
const filePath = path.join(protoDir, file)
if (extName === '.proto') {
const proto = grpc.load(filePath)
const Service = proto[packageName][serviceName]
this.services[serviceName] = Service
this.clients[serviceName] = new Service(`${this.ip}:${this.port}`,
grpc.credentials.createInsecure())
}
}, files)
})
}
async invoke(serviceName, name, params) {
return new Promise((resolve, reject) => {
function callback(error, response) {
if (error) {
reject(error)
} else {
resolve(response)
}
}
params = params || {}
if (this.clients[serviceName]
&& this.clients[serviceName][name]) {
this.clients[serviceName][name](params, callback)
} else {
const error = new Error(
`RPC endpoint: "${serviceName}.${name}" does not exists.`)
reject(error)
}
})
}
export default RpcClient
业务调用示例:
JavaScriptlogger.info('RPC Client connecting:')
const rpcClient = new RpcClient(config.grpc.ip, config.grpc.port)
rpcClient.autoRun(path.join(__dirname, '../protos/'))
try {
// expected: Pong
const result = await rpcClient.invoke('testService', 'ping')
} catch (err) {
logger.error(err)
}
单元测试
JavaScriptimport { RpcClient } from '../components'
describe('one million RPCs', () => {
before(() => {
logger.info('RPC Client connecting:')
global.rpcClient = new RpcClient(config.grpc.ip, config.grpc.port)
rpcClient.autoRun(path.join(__dirname, '../protos'))
})
it('should not failed', async(done) => {
const startTime = Date.now()
const times = 1000000
for(let i = 0; i < times; i++) {
console.log(i)
const respone = await rpcClient.invoke('testService', 'ping')
respone.message.should.be.equal('Pong')
}
const total = Date.now() - startTime
print('total(MS):', total)
done()
})
})
测试结果为单请求毫秒级响应:
Shell999992
999993
999994
999995
999996
999997
999998
999999
total(MS): 1084334
✓ should not failed (1084338ms)
1 passing (18m)
当然,这种顺序执行的RPC测试用例并不能准确地反映并发性能,仅供参考。
打赏作者 您的支持将激励我继续创作! 去打赏 支持: 微信支付 支付宝您的支持将鼓励我们继续创作!
微信支付 支付宝用 [微信] 扫描二维码打赏
用 [支付宝] 扫描二维码打赏