nodejs微服务框架解决方案

13,207 阅读13分钟

前言

seneca是一个nodejs微服务工具集,它赋予系统易于连续构建和更新的能力。下面会逐一和大家一起了解相关技术入门以及实践。

这里插入一段硬广。小子再进行简单整合之后撸了个vastify框架 ---- 轻量级nodejs微服务框架,有兴趣的同学过目一下,欢迎顺手star一波,另外有疑问或者代码有毛病欢迎在博文下方留言。

环境

  • 基础环境
"node": "^10.0.0"
"npm": "^6.0.0"
"pm2": "^2.10.3"
"rabbitmq": "^3.7.5"
"consul": "^1.1.0"
"mongodb": "^3.6"
  • 微服务工程
"bluebird": "^3.5.1"
"koa": "^2.5.1"
"koa-router": "^7.4.0"
"seneca": "^3.4.3"
"seneca-web": "^2.2.0"
"seneca-web-adapter-koa2": "^1.1.0"
"amqplib": "^0.5.2"
"winston": "^2.4.2"
"mongoose": "^5.1.2"

FEATURES

  • 模式匹配做服务间调用:略微不同于SpringCloud服务发现(http协议、IP + PORT模式),它使用更加灵活的模式匹配(Patrun模块)原则去进行微服务间的调用
  • 接入koa2对C端提供RESTFUl API
  • 插件:更灵活编写小而微的可复用模块
  • seneca内置日志输出
  • 第三方日志库比较winston(选用)、bunyan、log4js
  • RabbitMQ消息队列
  • PM2:node服务部署(服务集群)、管理与监控
  • PM2:自动化部署
  • PM2集成docker
  • 请求追踪(重建用户请求流程)
  • 梳理Consul 服务注册与发现基本逻辑
  • 框架集成node-consul
  • mongodb持久化存储
  • 结合seneca与consul的路由服务中间件(可支持多个相同名字服务集群路由,通过?version区别)
  • 支持流处理(文件上传/下载等)
  • jenkins自动化部署
  • nginx负载均衡
  • 持续集成方案
  • redis缓存
  • prisma提供GraphQL接口

模式匹配(Patrun模块)

index.js(accout-server/src/index.js)

const seneca = require('seneca')()

seneca.use('cmd:login', (msg, done) => {
	const { username, pass } = msg
	if (username === 'asd' && pass === '123') {
		return done(null, { code: 1000 })
	}
	return done(null, { code: 2100 })
})

const Promise = require('bluebird')

const act = Promise.promisify(seneca.act, { context: 'seneca' })

act({
	cmd: 'login',
	username: 'asd',
	pass: '123'
}).then(res => {
	console.log(res)
}).catch(err => {
	console.log(err)
})

执行后

{ code: 1000 }
{"kind":"notice","notice":"hello seneca k5i8j1cvw96h/1525589223364/10992/3.4.3/-","level":"info","seneca":"k5i8j1cvw96h/1525589223364/10992/3.4.3/-","when":1525589223563}

seneca.add方法,添加一个action pattern到Seneca实例中,它有三个参数:

  1. pattern: 用于Seneca中JSON的消息匹配模式,对象或格式化字符串
  2. sub_pattern: 子模式,优先级低于主模式(可选)
  3. action: 当匹配成功后的动作函数

seneca.act方法,执行Seneca实例中匹配成功的动作,它也有两个参数:

  1. msg: JSON消息
  2. sub_pattern: 子消息,优先级低于主消息(可选)
  3. response: 用于接收服务调用结果

seneca.use方法,为Seneca实例添加一个插件,它有两个参数:(此处插件的原理和中间件有一些不同)

  1. func: 插件执行方法
  2. options: 插件所需options(可选)

核心是利用JSON对象进行模式匹配。这个JSON对象既包含某个微服务所需要调取另一个微服务的特征,同时也包含传参。和Java微服务发现有些类似不过是用模式代替ip+port,目前为止模式是完全可以实现服务发现功能,但是否更加灵活还有待去挖掘。

所需注意的点

  • 各微服务之间模式需通过设计来区分

启动第一个微服务

index.js(config-server/src/index.js)

const seneca = require('seneca')()
const config = {
SUCCESS_NORMAL_RES: {
    code: 1000,
    desc: '服务端正常响应'
}}

seneca.add('$target$:config-server', (msg, done) => {
  return done(null, config)
}).listen(10011)

运行此脚本后可在浏览器中输入http://localhost:10011/act?cmd=config发起请求获取全局配置信息

OR

const seneca = require('seneca')()
const Promise = require('bluebird')

const act = Promise.promisify(seneca.act, { context: seneca })

seneca.client(10011)
act('?target:config-server, default$:{msg:404}').then(res => {
  console.log(res)
}).catch(err => {
  console.log(err)
})

对内:多个微服务相互调用(关键)

noname-server

const seneca = require('seneca')()
seneca.add('?target:account-server', (msg, done) => {
	done(null, { seneca: '666' })
})
seneca.listen(10015)

config-server(同上)

call

const seneca = require('seneca')()
const Promise = require('blurebird')

const act = Promise.promisify(seneca.act, { context: seneca })

seneca.client({
	port: '10011',
	pin: '?target:account-server'
})
seneca.client({
	port: '10015',
	pin: '?target:noname-server'
})

act('?target:account-server').then(res => {
	console.log(res)
}).catch(err => {
	console.log(err)
})

act('?target:noname-server').then(res => {
	console.log(res)
}).catch(err => {
	console.log(err)
})

对外:提供REST服务(关键)

集成koa

const seneca = require('seneca')()
const Promise = require('bluebird')
const SenecaWeb = require('seneca-web')
const Koa = require('koa')
const Router = require('koa-router')
const app = new Koa()
const userModule = require('./modules/user.js')

// 初始化用户模块
seneca.use(userModule.init)

// 初始化seneca-web插件,并适配koa
seneca.use(SenecaWeb, {
  context: Router(),
  adapter: require('seneca-web-adapter-koa2'),
  routes: [...userModule.routes]
})

// 将routes导出给koa app
seneca.ready(() => {
  app.use(seneca.export('web/context')().routes())
})

app.listen(3333)

user模块

const $module = 'module:user'
let userCount = 3

const REST_Routes = [
  {
    prefix: '/user',
    pin: `${$module},if:*`,
    map: {
      list: {
        GET: true,
        name: ''
      },
      load: {
        GET: true,
        name: '',
        suffix: '/:id'
      },
      edit: {
        PUT: true,
        name: '',
        suffix: '/:id'
      },
      create: {
        POST: true,
        name: ''
      },
      delete: {
        DELETE: true,
        name: '',
        suffix: '/:id'
      }
    }
  }
]

const db = {
  users: [{
    id: 1,
    name: '甲'
  }, {
    id: 2,
    name: '乙'
  }, {
    id: 3,
    name: '丙'
  }]
}

function user(options) {
  this.add(`${$module},if:list`, (msg, done) => {
    done(null, db.users)
  })
  this.add(`${$module},if:load`, (msg, done) => {
    const { id } = msg.args.params
    done(null, db.users.find(v => Number(id) === v.id))
  })
  this.add(`${$module},if:edit`, (msg, done) => {
    let { id } = msg.args.params
    id = +id
    const { name } = msg.args.body
    const index = db.users.findIndex(v => v.id === id)
    if (index !== -1) {
      db.users.splice(index, 1, {
        id,
        name
      })
      done(null, db.users)
    } else {
      done(null, { success: false })
    }
  })
  this.add(`${$module},if:create`, (msg, done) => {
    const { name } = msg.args.body
    db.users.push({
      id: ++userCount,
      name
    })
    done(null, db.users)
  })
  this.add(`${$module},if:delete`, (msg, done) => {
    let { id } = msg.args.params
    id = +id
    const index = db.users.findIndex(v => v.id === id)
    if (index !== -1) {
      db.users.splice(index, 1)
      done(null, db.users)
    } else {
      done(null, { success: false })
    }
  })
}

module.exports = {
  init: user,
  routes: REST_Routes
}

vscode-restclient(vscode的restclient插件,用于发起RESTFUL请求)

### 1
POST http://localhost:3333/user HTTP/1.1
Content-Type: application/json

{
  "name": "测试添加用户"
}

### delete
DELETE http://localhost:3333/user/2 HTTP/1.1

### PUT
PUT http://localhost:3333/user/2 HTTP/1.1
Content-Type: application/json

{
  "name": "测试修改用户信息"
}

### GET
GET http://localhost:3333/user HTTP/1.1

### GET
GET http://localhost:3333/user/3 HTTP/1.1

seneca内置日志输出

可在构造函数中传入配置,log属性可以控制日志级别

例1:传字符串

require('seneca')({
	// quiet silent any all print standard test
	log: 'all'
})

例2:传对象

require('seneca')({
	log: {
		// none debug+ info+ warn+
		level: 'debug+'
	},
	// 设置为true时,seneca日志功能会encapsulate senecaId,senecaTag,actId等字段后输出(一般为两字符)
	short: true
})

建议例2代码,因为seneca-web-adapter-koa2插件打印的日志level为debug,利于做web接口访问日志记录。

winston日志模块

传送门

Logger.js

const { createLogger, format, transports } = require('winston')
const { combine, timestamp, label, printf } = format

const logger = createLogger({
  level: 'info',
  format: combine(
    label({label: 'microservices'}),
    timestamp(),
    printf(info => {
      return `${info.timestamp} [${info.label}] ${info.level}: ${info.message}`
    })
  ),
  transports: [ new transports.Console() ]
})

// highest to lowest
const levels = {
  error: 0,
  warn: 1,
  info: 2,
  verbose: 3,
  debug: 4,
  silly: 5
}

module.exports = logger

日志输出格式

2018-05-17T14:43:28.330Z [microservices] info: 接收到rpc客户端的调用请求
2018-05-17T14:43:28.331Z [microservices] warn: warn message
2018-05-17T14:43:28.331Z [microservices] error: error message

RabbitMQ消息队列服务

1. 单任务单consumer,生产者消费者模式

producer.js

// 创建一个amqp对等体
const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const q = 'taskQueue1'
    const msg = process.argv.slice(2).join(' ') || 'hello world'

    // 为方式RabbitMQ退出或者崩溃时重启后丢失队列信息,这里配置durable:true(同时在消费者脚本中也要配置durable:true)后,
    ch.assertQueue(q, { durable: true })
    // 这里配置persistent:true,通过阅读官方文档,我理解为当程序重启后,会断点续传之前未send完成的数据消息。(但此功能并不可靠,因为不会为所有消息执行同步IO,会缓存在cache并在某个恰当时机write到disk)
    ch.sendToQueue(q, Buffer.from(msg), { persistent: true })
    setTimeout(() => {
      conn.close(); process.exit(0)
    }, 100)
  })
})
// 创建一个amqp对等体
const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const q = 'taskQueue1'

    // 为方式RabbitMQ退出或者崩溃时重启后丢失队列信息,这里配置durable:true(同时在消费者脚本中也要定义durable:true)后,
    ch.assertQueue(q, { durable: true })
    ch.prefetch(1)
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q)
    ch.consume(q, msg => {
      const secs = msg.content.toString().split('.').length - 1
      console.log(" [x] Received %s", msg.content.toString())
      setTimeout(() => {
        console.log(" [x] Done")
        ch.ack(msg)
      }, secs * 1000)
    })
    // noAck配置(默认为false)表明consumer是否需要在处理完后反馈ack给producer,如果设置为true,则RabbitMQ服务如果将任务send至此consumer后不关心任务实际处理结果,send任务后直接标记已完成;否则,RabbiMQ得到ack反馈后才标记为已完成,如果一直未收到ack默认会一直等待ack然后标记,另外如果接收到nack或者该consumer进程退出则继续dispatcher任务
  })
})

检验过程

  • 执行rabbitmqctl list_queues查看当前队列
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
  • node producer.js(rabbitMQ执行过程为会先创建一个匿名exchange,一个指定queue然后将queue与该匿名exchange绑定)

  • rabbitmqctl list_bindings

Listing bindings for vhost /...
        exchange        taskQueue1      queue   taskQueue1      []
  • rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1      1
  • node consumer.js
Waiting for messages in taskQueue1. To exit press CTRL+C
[x] Received hello world
[x] Done
  • rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1      0

知识点

  • 生产者消费者模式(一个生产者的消息在同一时间只交由一个消费者处理)
  • ACK机制(rabbitmq的确认机制)
  • 创建队列{durable:true}以及向队列发送消息{persistent:true}(消息持久化存储,但不完全能保证,比如当某消息未从缓存中写到磁盘中而程序崩溃时则会丢失)
  • Round-robin Dispatch(公平分发)
  • 处理窗口控制(prefetch来控制分发窗口)
  • 异步多任务处理机制(比如一个大任务分解,分而治之)
  • 整个消息流流程(某个生产者进程 -> 匿名exchange -> 通过binding -> 指定queue -> 某一个消费者进程)

2. 单任务多consumer,发布/订阅模式(全消息模型)

publisher.js

const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const ex = 'logs'
    const msg = process.argv.slice(2).join(' ') || 'Hello World!'

    // ex为exchange名称(唯一)
    // 模式为fanout
    // 不对消息持久化存储
    ch.assertExchange(ex, 'fanout', { durable: false })
    // 第二个参数为指定某一个binding,如为空则由RabbitMQ随机指定
    ch.publish(ex, '', Buffer.from(msg))
    console.log(' [x] Send %s', msg)
  })

  setTimeout(() => {
    conn.close()
    process.exit(0)
  }, 100)
})

subscriber.js

const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const ex = 'logs'

    // ex -> exchange是发布/订阅消息的载体,
    // fanout -> 分发消息的模式,fanout,direct,topic,headers
    // durable设置为false降低一些可靠性,提高性能,因为不需要磁盘IO持久化存储消息,另外
    ch.assertExchange(ex, 'fanout', { durable: false })
    // 使用匿名(也就是RabbitMQ自动生成随机名的queue)队列
    // exclusive设置为true,即可以当其寄生的connection被close的时候自动deleted
    ch.assertQueue('', { exclusive: true }, (err, q) => {
      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue)
      // 绑定队列到某个exchange载体(监听某个exchange的消息)
      // 第三个入参为binding key
      ch.bindQueue(q.queue, ex, '')
      // 消费即订阅某个exchange的消息并设置处理句柄
      // 因为发布/订阅消息的模式就是非可靠性,只有当订阅者订阅才能收到相关的消息而且发布者不关心该消息的订阅者是谁以及处理结果如何,所以这里noAck会置为true
      ch.consume(q.queue, (msg) => {
        console.log(' [x] %s', msg.content.toString())
      }, { noAck: true })
    })
  })
})

检验过程

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空之前测试使用的queues、echanges、bindings)

node subscriber.js

[*] Waiting for messages in amq.gen-lgNW51IeEfj9vt1yjMUuaw. To exit press CTRL+C

rabbitmqctl list_exchanges

Listing exchanges for vhost / ...
logs    fanout

rabbitmqctl list_bindings

Listing bindings for vhost /...
        exchange        amq.gen-jDbfwJR8TbSNJT2a2a83Og  queue   amq.gen-jDbfwJR8TbSNJT2a2a83Og  []
logs    exchange        amq.gen-jDbfwJR8TbSNJT2a2a83Og  queue           []

node publisher.js tasks.........

[x] Send tasks......... // publiser.js

[x] tasks......... // subscriber.js

知识点

  • 发布/订阅模式(发布者将消息以一对多的形式发送给订阅者处理)
  • noAck(此模式下推荐用非Ack机制,因为发布者往往不需要订阅者如何处理消息以及其结果)
  • durable:false(此模式下推荐不需要做数据持久化存储,原因如上)
  • exchange的工作模式(即路由类型,fanout,direct,topic,headers等,下节会讲解到)
  • 整个消息流流程(某个发布者进程 -> 指定exchange -> 通过binding以及工作模式 -> 某个或多个匿名queue即订阅者进程)

3. Direct Routing

exchange.js

module.exports = {
  name: 'ex1',
  type: 'direct',
  option: {
    durable: false
  },
  ranks: ['info', 'error', 'warning', 'severity']
}

direct-routing.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {

    ch.assertExchange(ex.name, ex.type, ex.options)
    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})

subscriber.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const ranks = ex.ranks

    ranks.forEach(rank => {
      // 声明一个非匿名queue
      ch.assertQueue(`${rank}-queue`, { exclusive: false }, (err, q) => {
        ch.bindQueue(q.queue, ex.name, rank)
        ch.consume(q.queue, msg => {

          console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
        }, { noAck: true })
      })
    })
  })
})

publisher.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const ranks = ex.ranks

    ranks.forEach(rank => {
      ch.publish(ex.name, rank, Buffer.from(`${rank} logs...`))
    })

    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})

检验过程

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空之前测试使用的queues、echanges、bindings)

node direct-routing.js rabbitmqctl list_exchanges

Listing exchanges for vhost / ...
amq.headers	headers
ex1	direct
amq.fanout	fanout
amq.rabbitmq.trace	topic
amq.topic	topic
	direct
amq.direct	direct
amq.match	headers

node subscriber.js rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
severity-queue	0
error-queue	0
info-queue	0
warning-queue	0

Listing bindings for vhost /...
	exchange	error-queue	queue	error-queue	[]
	exchange	info-queue	queue	info-queue	[]
	exchange	severity-queue	queue	severity-queue	[]
	exchange	warning-queue	queue	warning-queue	[]
ex1	exchange	error-queue	queue	error	[]
ex1	exchange	info-queue	queue	info	[]
ex1	exchange	severity-queue	queue	severity	[]
ex1	exchange	warning-queue	queue	warning	[]

node publisher.js

 [x] info: 'info logs...'
 [x] error: 'error logs...'
 [x] severity: 'severity logs...'
 [x] warning: 'warning logs...'

知识点

  • 路由key,用于exchange的direct工作模式下消息的路由
  • 每当assertQueue时,该queue会在以queue名称当作路由key绑定到匿名exchange
  • 可用于日志不同级别的log处理

4. Topic Routing

exchange.js

module.exports = {
  name: 'ex2',
  type: 'topic',
  option: {
    durable: false
  },
  ranks: ['info', 'error', 'warning', 'severity']
}

topic-routing.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    ch.assertExchange(exchangeConfig.name, exchangeConfig.type, exchangeConfig.option)

    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})

subscriber.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const args = process.argv.slice(2)
    const keys = (args.length > 0) ? args : ['anonymous.info']

    console.log(' [*] Waiting for logs. To exit press CTRL+C');
    keys.forEach(key => {
      ch.assertQueue('', { exclusive: true }, (err, q) => {
        console.log(` [x] Listen by routingKey ${key}`)
        ch.bindQueue(q.queue, exchangeConfig.name, key)

        ch.consume(q.queue, msg => {
          console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
        }, { noAck: true })
      })
    })
  })
})

publisher.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const args = process.argv.slice(2)
    const key = (args.length > 1) ? args[0] : 'anonymous.info'
    const msg = args.slice(1).join(' ') || 'hello world'

    ch.publish(exchangeConfig.name, key, Buffer.from(msg))

    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})

检验过程

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空之前测试使用的queues、echanges、bindings)

node topic-routing.js

Listing exchanges for vhost / ...
amq.fanout	fanout
amq.rabbitmq.trace	topic
amq.headers	headers
amq.match	headers
ex2	topic
	direct
amq.topic	topic
amq.direct	direct

node subscriber.js "#.info" "*.error"

[*] Waiting for logs. To exit press CTRL+C
[x] Listen by routingKey #.info
[x] Listen by routingKey *.error
  • node publisher.js "account-server.info" "用户服务测试"
  • node publisher.js "config-server.info" "配置服务测试"
  • node publisher.js "config-server.error" "配置服务出错"
[x] account-server.info:'用户服务测试'
[x] config-server.info:'配置服务测试'
[x] config-server.error:'配置服务出错'

知识点

  • key最长为255字节
  • #可匹配0或多个单词,*可精确匹配1个单词

5. RPC

rpc_server.js

const amqp = require('amqplib/callback_api')
const logger = require('./Logger')

let connection = null

amqp.connect('amqp://localhost', (err, conn) => {
  connection = conn
  conn.createChannel((err, ch) => {
    const q = 'account_rpc_queue'

    ch.assertQueue(q, { durable: true })
    ch.prefetch(2)

    ch.consume(q, msg => {
      let data = {}
      let primitiveContent = msg.content.toString()
      try {
        data = JSON.parse(primitiveContent)
      } catch (e) {
        logger.error(new Error(e))
      }
      logger.info('接收到rpc客户端的调用请求')
      if (msg.properties.correlationId === '10abc') {
        logger.info(primitiveContent)
        const uid = Number(data.uid) || -1
        let r = getUserById(uid)
        ch.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(r)), { persistent: true })
        ch.ack(msg)
      } else {
        logger.info('不匹配的调用请求')
      }
    })
  })
})

function getUserById (uid) {
  let result = ''

  if (uid === +uid && uid > 0) {
    result = {
      state: 1000,
      msg: '成功',
      data: {
        uid: uid,
        name: '小强',
        sex: 1
      }
    }
  } else {
    result = {
      state: 2000,
      msg: '传参格式错误'
    }
  }

  return result
}

process.on('SIGINT', () => {
  logger.warn('SIGINT')
  connection && connection.close()
  process.exit(0)
})

rpc_client.js

const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const q = 'account_rpc_queue'
    const callback = 'callback_queue'

    ch.assertQueue(callback, { durable: true })
    ch.consume(callback, msg => {
      const result = msg.content.toString()
      console.log(`接收到回调的消息啦!`)
      console.log(result)
      ch.ack(msg)
      setTimeout(() => {
        conn.close()
        process.exit(0)
      }, 0)
    })

    ch.assertQueue(q, { durable: true })
    const msg = {
      uid: 2
    }
    ch.sendToQueue(q, Buffer.from(JSON.stringify(msg)), {
      persistent: true,
      correlationId: '10abc',
      replyTo: 'callback_queue'
    })
  })
})

检验过程

node rpc_server.js

rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
account_rpc_queue	0

node rpc_client.js

rpc_client的CLI打印

接收到回调的消息啦!
{"state":1000,"msg":"成功","data":{"uid":2,"name":"小强","sex":1}}

rpc_server的CLI打印

接收到rpc客户端的调用请求
{ uid: 2 }

PM2:node服务部署(服务集群)、管理与监控

pm2官网

启动

pm2 start app.js

  • -w --watch:监听目录变化,如变化则自动重启应用
  • --ignore-file:监听目录变化时忽略的文件。如pm2 start rpc_server.js --watch --ignore-watch="rpc_client.js"
  • -n --name:设置应用名字,可用于区分应用
  • -i --instances:设置应用实例个数,0与max相同
  • -f --force: 强制启动某应用,常常用于有相同应用在运行的情况
  • -o --output <path>:标准输出日志文件的路径
  • -e --error <path>:错误输出日志文件的路径
  • --env <path>:配置环境变量

pm2 start rpc_server.js -w -i max -n s1 --ignore-watch="rpc_client.js" -e ./server_error.log -o ./server_info.log

在cluster-mode,也就是-i max下,日志文件会自动在后面追加-${index}保证不重复

其他简单且常用命令

pm2 stop app_name|app_id pm2 restart app_name|app_id pm2 delete app_name|app_id pm2 show app_name|app_id OR pm2 describe app_name|app_id pm2 list pm2 monit pm2 logs app_name|app_id --lines <n> --err

Graceful Stop

pm2 stop app_name|app_id

process.on('SIGINT', () => {
  logger.warn('SIGINT')
  connection && connection.close()
  process.exit(0)
})

当进程结束前,程序会拦截SIGINT信号从而在进程即将被杀掉前去断开数据库连接等等占用内存的操作后再执行process.exit()从而优雅的退出进程。(如在1.6s后进程还未结束则继续发送SIGKILL信号强制进程结束)

Process File

ecosystem.config.js

const appCfg = {
  args: '',
  max_memory_restart: '150M',
  env: {
    NODE_ENV: 'development'
  },
  env_production: {
    NODE_ENV: 'production'
  },
  // source map
  source_map_support: true,
  // 不合并日志输出,用于集群服务
  merge_logs: false,
  // 常用于启动应用时异常,超时时间限制
  listen_timeout: 5000,
  // 进程SIGINT命令时间限制,即进程必须在监听到SIGINT信号后必须在以下设置时间结束进程
  kill_timeout: 2000,
  // 当启动异常后不尝试重启,运维人员尝试找原因后重试
  autorestart: false,
  // 不允许以相同脚本启动进程
  force: false,
  // 在Keymetrics dashboard中执行pull/upgrade操作后执行的命令队列
  post_update: ['npm install'],
  // 监听文件变化
  watch: false,
  // 忽略监听文件变化
  ignore_watch: ['node_modules']
}

function GeneratePM2AppConfig({ name = '', script = '', error_file = '', out_file = '', exec_mode = 'fork', instances = 1, args = "" }) {
  if (name) {
    return Object.assign({
      name,
      script: script || `${name}.js`,
      error_file: error_file || `${name}-err.log`,
      out_file: out_file|| `${name}-out.log`,
      instances,
      exec_mode: instances > 1 ? 'cluster' : 'fork',
      args
    }, appCfg)
  } else {
    return null
  }
}

module.exports = {
  apps: [
    GeneratePM2AppConfig({
      name: 'client',
      script: './rpc_client.js'
    }),

    GeneratePM2AppConfig({
      name: 'server',
      script: './rpc_server.js',
      instances: 1
    })
  ]
}

pm2 start ecosystem.config.js

避坑指南:processFile文件命名建议为*.config.js格式。否则后果自负。

监控

请移步app.keymetrics.io

PM2:自动化部署

ssh准备

  1. ssh-keygen -t rsa -C 'qingf deployment' -b 4096
  2. 如果有多密钥、多用户情况,建议配置~/.ssh/config文件,格式类似如下
// 用不同用户对不同远程主机发起ssh请求时指定私钥
Host qingf.me
  User deploy
  IdentityFile ~/.ssh/qf_deployment_rsa
  // 设置为no可去掉首次登陆(y/n)的选择
  StrictHostKeyChecking no
// 别名用法
Host deployment
  User deploy
  Hostname qingf.me
  IdentityFile ~/.ssh/qingf_deployment_rsa
  StrictHostKeyChecking no
  1. 将公钥复制到远程(一般为部署服务器)对应用户目录,比如/home/deploy/.ssh/authorized_keys文件(authorized_keys文件权限设置为600)

配置ecosystem.config.js

与上述apps同级增加deploy属性,如下

deploy: {
    production: {
        'user': 'deploy',
        'host': 'qingf.me',
        'ref': 'remotes/origin/master',
        'repo': 'https://github.com/Cecil0o0/account-server.git',
        'path': '/home/deploy/apps/account-server',
        // 生命周期钩子,在ssh到远端之后setup操作之前执行
        'pre-setup': '',
        // 生命周期钩子,在初始化设置即git pull之后执行
        'post-setup': 'ls -la',
        // 生命周期钩子,在远端git fetch origin之前执行
        'pre-setup': '',
        // 生命周期钩子,在远端git修改HEAD指针到指定ref之后执行
        'post-deploy': 'npm install && pm2 startOrRestart deploy/ecosystem.config.js --env production',
        // 以下这个环境变量将注入到所有app中
        "env"  : {
          "NODE_ENV": "test"
        }
    }
}

tip:please make git working directory clean first!

此处如果不懂或者有疑问,请查阅Demo

然后先后执行以下两条命令**(注意config文件路径)**

  1. pm2 deploy deploy/ecosystem.config.js production setup
  2. pm2 deploy deploy/ecosystem.config.js production

其他命令

pm2 deploy <configuration_file>

  Commands:
    setup                run remote setup commands
    update               update deploy to the latest release
    revert [n]           revert to [n]th last deployment or 1
    curr[ent]            output current release commit
    prev[ious]           output previous release commit
    exec|run <cmd>       execute the given <cmd>
    list                 list previous deploy commits
    [ref]                deploy to [ref], the "ref" setting, or latest tag

推荐shell toolkit

oh my zsh

请求追踪

如何?

  • 在seneca.add以及seneca.act中使用seneca.fixedargs['tx$']值作为traceID标识处于某一条请求流程。另外seneca内置log系统会打印此值。

疑问?

seneca内置log系统如何做自定义日志打印?

温馨提示:请以正常的http请求开始,因为经过测试如果微服务自主发起act,其seneca.fixedargs['tx$']值不同。

Consul 服务注册与发现

Consul是一个分布式集群服务注册发现工具,并具有健康检查、分级式KV存储、多数据中心等高级特性。

安装

  • 可选择使用预编译的安装包
  • 也可选择克隆源码后编译安装

基础使用

  • 以开发模式快速启动服务模式代理并开启web界面访问http://localhost:8500

consul agent -dev -ui

  • 编写服务定义文件
{
  "service": {
	// 服务名,稍后用于query服务
    "name": "account-server",
	// 服务标签
    "tags": ["account-server"],
	// 服务元信息
    "meta": {
      "meta": "for my service"
    },
	// 服务端口
    "port": 3333,
	// 不允许标签覆盖
    "enable_tag_override": false,
	// 脚本检测做health checks 与-enable-script-checks=true配合使用,有脚本模式、TCP模式、HTTP模式、TTL模式
    "checks": [
      {
        "http": "http://localhost:3333/user",
        "interval": "10s"
      }
    ]
  }
}
  • query定义的account-server服务

curl http://localhost:8500/v1/catalog/service/account-server

[
    {
        "ID": "e66eb1ff-460c-e63f-b4ac-0cb42daed19c",
        "Node": "haojiechen.local",
        "Address": "127.0.0.1",
        "Datacenter": "dc1",
        "TaggedAddresses": {
            "lan": "127.0.0.1",
            "wan": "127.0.0.1"
        },
        "NodeMeta": {
            "consul-network-segment": ""
        },
        "ServiceID": "account-server",
        "ServiceName": "account-server",
        "ServiceTags": [
            "account-server"
        ],
        "ServiceAddress": "",
        "ServiceMeta": {
            "meta": "for my service"
        },
        "ServicePort": 3333,
        "ServiceEnableTagOverride": false,
        "CreateIndex": 6,
        "ModifyIndex": 6
    }
]

生产级别使用(分布式集群)

某一个结点启动一个server模式代理,如下

consul agent -server -bootstrap-expect=1 \
	-data-dir=/tmp/consul -node=agent-one -bind=valid extranet IP \
	-enable-script-checks=true -config-dir=/usr/local/etc/consul.d

查看集群成员

consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-one  valid extranet IP:8301  alive   server  1.1.0  2         dc1  <all>

另一个结点启动一个client模式代理,如下

consul agent \
	-data-dir=/tmp/consul -node=agent-two -bind=139.129.5.228 \
	-enable-script-checks=true -config-dir=/usr/local/etc/consul.d

查看集群成员

consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-two  139.129.5.228:8301  alive   server  1.1.0  2         dc1  <all>

加入Cluster

consul join 139.129.5.228 consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-one  valid extranet IP:8301  alive   server  1.1.0  2         dc1  <all>
agent-two  139.129.5.228:8301  alive   server  1.1.0  2         dc1  <all>

集成node-consul

config.js

// 服务注册与发现
// https://github.com/silas/node-consul#catalog-node-services
  'serverR&D': {
    consulServer: {
      type: 'consul',
      host: '127.0.0.1',
      port: 8500,
      secure: false,
      ca: [],
      defaults: {
        token: ''
      },
      promisify: true
    },
    bizService: {
      name: 'defaultName',
      id: 'defaultId',
      address: '127.0.0.1',
      port: 1000,
      tags: [],
      meta: {
        version: '',
        description: '注册集群'
      },
      check: {
        http: '',
        // check间隔时间(ex: 15s)
        interval: '10s',
        // check超时时间(ex: 10s)
        timeout: '2s',
        // 处于临界状态后自动注销服务的超时时间
        deregistercriticalserviceafter: '30s',
        // 初始化状态值为成功
        status: 'passing',
        // 备注
        notes: '{"version":"111","microservice-port":1115}'
      }
    }
  }

server-register.js

/*
 * @Author: Cecil
 * @Last Modified by: Cecil
 * @Last Modified time: 2018-06-02 11:26:49
 * @Description 微服务注册方法
 */
const defaultConf = require('../config')['serverR&D']
const { ObjectDeepSet, isString } = require('../helper/utils')
const Consul = require('consul')
const { generateServiceName, generateCheckHttp } = require('../helper/consul')

// 注册服务

function register({ consulServer = {}, bizService = {} } = {}) {
  if (!bizService.name && isString(bizService.name)) throw new Error('name is invalid!')
  if (bizService.port !== +bizService.port) throw new Error('port is invalid!')
  if (!bizService.host && isString(bizService.host)) throw new Error('host is invalid!')
  if (!bizService.meta.?version) throw new Error('meta.?version is invalid!')
  if (!bizService.meta.?microservicePort) throw new Error('meta.?microservicePort is invalid!')
  const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
  const service = defaultConf.bizService
  service.name = generateServiceName(bizService.name)
  service.id = service.name
  service.address = bizService.host
  service.port = bizService.port
  service.check.http = generateCheckHttp(bizService.host, bizService.port)
  service.check.notes = JSON.stringify(bizService.meta)

  return new Promise((resolve, reject) => {
    consul.agent.service.list().then(services => {
      // 检查主机+端口是否已被占用
      Object.keys(services).some(key => {
        if (services[key].Address === service.address && services[key].Port === service.port) {
          throw new Error(`该服务集群endpoint[${service.address}, ${service.port}]已被占用!`)
        }
      })
      // 注册集群服务
      consul.agent.service.register(service).then(() => {
        logger.info(`${bizService.name}服务已注册`)
        resolve(services)
      }).catch(err => {
        console.log(err)
      })
    }).catch(err => {
      throw new Error(err)
    })
  })
}

module.exports = class ServerRegister {
  constructor() {
    this.register = register
  }
}

验证

保证runtime中存在consul和mongodb服务后,clone该仓库Demo,cd到工程根目录下,运行node src即可。

框架集成node-consul

server-register.js

/*
 * @Author: Cecil
 * @Last Modified by: Cecil
 * @Last Modified time: 2018-06-02 13:58:22
 * @Description 微服务注册方法
 */
const defaultConf = require('../config')['serverR&D']
const { ObjectDeepSet, isString } = require('../helper/utils')
const Consul = require('consul')
const { generateServiceName, generateCheckHttp } = require('../helper/consul')
const logger = new (require('./logger'))().generateLogger()

// 注册服务方法定义

function register({ consulServer = {}, bizService = {} } = {}) {
  if (!bizService.name && isString(bizService.name)) throw new Error('name is invalid!')
  if (bizService.port !== +bizService.port) throw new Error('port is invalid!')
  if (!bizService.host && isString(bizService.host)) throw new Error('host is invalid!')
  if (!bizService.meta.?version) throw new Error('meta.?version is invalid!')
  if (!bizService.meta.?microservicePort) throw new Error('meta.?microservicePort is invalid!')
  const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
  const service = defaultConf.bizService
  service.name = generateServiceName(bizService.name)
  service.id = service.name
  service.address = bizService.host
  service.port = bizService.port
  service.check.http = generateCheckHttp(bizService.host, bizService.port)
  service.check.notes = JSON.stringify(bizService.meta)

  return new Promise((resolve, reject) => {
    consul.agent.service.list().then(services => {
      // 检查主机+端口是否已被占用
      Object.keys(services).some(key => {
        if (services[key].Address === service.address && services[key].Port === service.port) {
          throw new Error(`该服务集群endpoint[${service.address}, ${service.port}]已被占用!`)
        }
      })
      // 注册集群服务
      consul.agent.service.register(service).then(() => {
        logger.info(`${bizService.name}服务注册成功`)
        resolve(services)
      }).catch(err => {
        console.log(err)
      })
    }).catch(err => {
      throw new Error(err)
    })
  })
}

module.exports = class ServerRegister {
  constructor() {
    this.register = register
  }
}

account-server/src/index.js

const vastify = require('vastify')
const version = require('../package.json').version
const microservicePort = 10015
const httpPort = 3333

// 注册服务
vastify.ServerRegister.register({
  bizService: {
    name: 'account-server',
    host: '127.0.0.1',
    port: httpPort,
    meta: {
      ?version: version,
      ?microservicePort: microservicePort
    }
  }
})

Mongodb持久化存储

  • 框架使用mongoose做mongoClient,当然你也可以选用原生nodejs mongoClient。

改造之前的user模块,偷个懒就不贴代码了,具体请查看Demo

结合seneca以及consul的路由服务中间件

microRouting.js

/*
 * @Author: Cecil
 * @Last Modified by: Cecil
 * @Last Modified time: 2018-06-02 16:22:02
 * @Description 微服务内部路由中间件,暂不支持自定义路由匹配策略
 */

'use strict'

const Consul = require('consul')
const defaultConf = require('../config')
const { ObjectDeepSet, isNumber } = require('../helper/utils')
const { getServiceNameByServiceKey, getServiceIdByServiceKey } = require('../helper/consul')
const logger = new (require('../tools/logger'))().generateLogger()
const { IPV4_REGEX } = require('../helper/regex')

let services = {}
let consul = null

/**
 * @author Cecil0o0
 * @description 同步consul服务中心的所有可用服务以及对应check并组装成对象以方便取值
 */
function syncCheckList () {
  return new Promise((resolve, reject) => {
    consul.agent.service.list().then(allServices => {
      if (Object.keys(allServices).length > 0) {
        services = allServices
        consul.agent.check.list().then(checks => {
          Object.keys(checks).forEach(key => {
            allServices[getServiceIdByServiceKey(key)]['check'] = checks[key]
          })
          resolve(services)
        }).catch(err => {
          throw new Error(err)
        })
      } else {
        const errmsg = '未发现可用服务'
        logger.warn(errmsg)
        reject(errmsg)
      }
    }).catch(err => {
      throw new Error(err)
    })
  })
}

function syncRoutingRule(senecaInstance = {}, services = {}) {
  Object.keys(services).forEach(key => {
    let service = services[key]
    let name = getServiceNameByServiceKey(key)
    let ?addr = service.Address
    let ?microservicePort = ''
    let ?version = ''
    try {
      let base = JSON.parse(service.check.Notes)
      ?microservicePort = base.?microservicePort
      ?version = base.?version
    } catch (e) {
      logger.warn(`服务名为${serviceName}。该服务check.Notes为非标准JSON格式,程序已忽略。请检查服务注册方式(请确保调用ServerRegister的register来注册服务)`)
    }

    if (IPV4_REGEX.test(?addr) && isNumber(?microservicePort)) {
      if (service.check.Status === 'passing') {
        senecaInstance.client({
          host: ?addr,
          port: ?microservicePort,
          pin: {
            ?version,
            ?target: name
          }
        })
      } else {
        logger.warn(`${?target}@${?version || '无'}服务处于critical,因此无法使用`)
      }
    } else {
      logger.warn(`主机(${?addr})或微服务端口号(${?microservicePort})有误,请检查`)
    }
  })
}


function startTimeInterval() {
  setInterval(syncCheckList, defaultConf.routing.servicesRefresh)
}

function microRouting(consulServer) {
  var self = this
  consul = Consul(ObjectDeepSet(defaultConf['serverR&D'].consulServer, consulServer))
  syncCheckList().then(services => {
    syncRoutingRule(self, services)
  })
}

module.exports = microRouting

在保证有consul与mongodb的runtime后,请结合这两个config-serveraccount-server Demo进行测试。

[未完待续....]