阅读 529

EggCluster 是如何解决多进程模式下相关问题的

背景

Node 官方提供了 cluster 模块来提供多进程的解决方案,以尽可能提升服务器资源使用效率。

整体而言,在这个问题域里,要解决的子问题有三块

  • 重启机制
  • 负载均衡
  • 状态共享,即通信机制

image.png
(上图总结自《深入浅出Node》第9章)

Egg 作为企业级框架,也针对这些问题,提供了 egg-cluster 模块来做了些增强

通信机制

为什么先讲通信?启动流程要用嘛。

image.png

可以看到,主要的实体包括了 Master、Agent、Worker三个, Master、Agent、Worker,其实三者更多完成的是通信的执行工作,真正提供通信管理能力的是 Manager 和 Messenger。

在看这俩模块之前,可以先看些基础知识

Messenger

Messenger 是一个消息发送器,负责:接收消息 -> 定向转发。

那为什么要单独搞这个模块?

  • 协议格式统一:Agent 和 Worker 都有 exit 和 message 事件
  • 通信方法统一:与 Parent 通信要走 process.send,与 Worker / Agent 通信要走 sendmessage 模块,和Master 走 EventEmitter

它包含两个部分:

  • 信息收集
  • 路由转发

以一个 worker 启动的例子为例。

首先,信息收集,使用的是订阅/通知模式,是以 master 显式调用 messenger 来处理的。

// 在 cluster 启动完毕后,会告知父进程启动成功
const action = 'egg-ready';
this.messenger.send({
  action,
  to: 'parent',
  data: {
    port: this[REAL_PORT],
    address: this[APP_ADDRESS],
    protocol: this[PROTOCOL],
  },
});
复制代码

其次是通过 send 做定向转发,包括两部分

// 路由识别
if (data.to === 'parent') {
  this.sendToParent(data);
  return;
}

// 调用指定方法
sendToParent(data) {
  if (!this.hasParent) {
    return;
  }
  process.send(data);
}
复制代码

更多信息可以,查看这篇文档: Messenger 模块

Manager

Manager 模块比较简单,主要是针对Agent、Worker 提供管理操作。值得一提的是,它的存活检查代码

// agent.status的修改操作在master的onAgentStart中完成
count() {
  return {
    agent: (this.agent && this.agent.status === 'started') ? 1 : 0,
    worker: this.listWorkerIds().length,
  };
}

startCheck() {
  this.exception = 0;
  // 每10秒检查一次
  this.timer = setInterval(() => {
    const count = this.count();
    if (count.agent && count.worker) {
      this.exception = 0;
      return;
    }
    // 如果agent和worker都不符合要求,超过3次就触发exception,master那边收到消息后会退出
    this.exception++;
    if (this.exception >= 3) {
      this.emit('exception', count);
      clearInterval(this.timer);
    }
  }, 10000);
}
复制代码

详见文档:Manager

启动流程

从 npm run dev 开始

先从启动流程入手,来看看 npm run dev 这个命令到底发生了什么。

它其实是执行了 egg-bin 的 lib/cmd/dev.js 文件的 run 方法

// lib/cmd/dev.js 文件
constructor(rawArgv) {
  // 省略其他初始化代码
  this.serverBin = path.join(__dirname, '../start-cluster');
}

* run(context) {
  // 省略参数格式化过程
  yield this.helper.forkNode(this.serverBin, devArgs, options);
}

// start-cluster.js文件,执行框架的 startCluster
require(options.framework).startCluster(options);
复制代码

如果框架是egg,那最后就会执行 egg 的这段代码

exports.startCluster = require('egg-cluster').startCluster;
复制代码

因此最终是执行了,egg-cluster 模块的 index.js

exports.startCluster = function(options, callback) {
  new Master(options).ready(callback);
};
复制代码

之后的流程不难,但是内容非常细碎,可以去看 启动和退出分析,主要是介绍如何实现下面的流程的

    +---------+           +---------+          +---------+
    |  Master |           |  Agent  |          |  Worker |
    +---------+           +----+----+          +----+----+
         |      fork agent     |                    |
         +-------------------->|                    |
         |      agent ready    |                    |
         |<--------------------+                    |
         |                     |     fork worker    |
         +----------------------------------------->|
         |     worker ready    |                    |
         |<-----------------------------------------+
         |      Egg ready      |                    |
         +-------------------->|                    |
         |      Egg ready      |                    |
         +----------------------------------------->|
复制代码

Agent的平滑重启

首先,在启动Agent的时候就会去注册回调

forkAgentWorker(){
  // 获得agent
  const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt);
  // 监听退出事件,转发给master
  agentWorker.once('exit', (code, signal) => {
    this.messenger.send({
      action: 'agent-exit',
      data: {
        code,
        signal,
      },
      to: 'master',
      from: 'agent',
    });
  });
}

constructor(){
	this.on('agent-exit', this.onAgentExit.bind(this));
}
复制代码

接着在 onAgentExit 中去处理重启逻辑

onAgentExit(data) {
  if (this.closed) return;
  // 清理工作
  const agentWorker = this.agentWorker;
  this.workerManager.deleteAgent(this.agentWorker);
  agentWorker.removeAllListeners();

  // 如果已经启动过,就自动重启
  if (this.isStarted) {
    setTimeout(() => {
      this.forkAgentWorker();
    }, 1000);
 
		// 省略一段转发消息给parent的代码
  } else {
    process.exit(1);
  }
}
复制代码

isStarted 标志,是用来记录整体是否启动成功,它在 ready 回调用中被赋值

// 这个ready是由 get-ready模块提供的,主要是解决异步任务注册问题的,便于自由添加启动前的异步任务
this.ready(() => {
	this.isStarted = true;
});
复制代码

Worker的平滑重启

Worker的平滑重启主要是交给 cfork 模块完成的,egg-cluster 中对于exit 事件的监听只是做个转发。

大致思路是通过 cluster 模块去监听 exit 事件 和 disconnect 事件,然后来根据 disableRefork 配置,判断是否要重启,这其中会处理一些重启逻辑

cluster.on('disconnect', function (worker) {
  // API参考:https://nodejs.org/api/cluster.html#cluster_worker_isdead
  var isDead = worker.isDead && worker.isDead();
  if (isDead) {
    // worker has terminated before disconnect
    return;
  }
  // 配置不重启就不会继续进行
  if (worker.disableRefork) {
    return;
  }
	
  // disconnect 用来保存失联的进程,下文会用到
  disconnects[worker.process.pid] = utility.logDate();
 
  // 重启逻辑
  if (allow()) {
    newWorker = forkWorker(worker._clusterSettings);
    newWorker._clusterSettings = worker._clusterSettings;
  } else {
  	// 省略
  }
});

cluster.on('exit', function (worker, code, signal) {
  var isExpected = !!disconnects[worker.process.pid];

  // 如果已经先响应了disconnect事件,就不用再走后续退出流程了
  if (isExpected) {
    delete disconnects[worker.process.pid];
    // worker disconnect first, exit expected
    return;
  }

  // 类似的判断 disableRefork 的逻辑,省略

  unexpectedCount++;
	
  // 类似的重启逻辑,省略
  
  cluster.emit('unexpectedExit', worker, code, signal);
});
复制代码

负载均衡 Sticky Mode

背景:最早 Session 等状态信息是保存在 Worker 内存里的,所以一旦用户的多次请求打到不同的Worker上的时候,必然会出现登录态失效的问题。

解决方案:通过一定的方式保证同一个用户的请求打到同一个 Worker 上,Sticky Mode 就是为了解决这个问题

:::info 其实查看 egg-bin 的README.md 文件就可以发现其实默认是不启动的,但出于有趣,还是想介绍下。 :::

转发实现

首先,如果启用了 sticky 模式,在 master 当中会分配一个 stickyWorkerPort

// master.js 
detectPorts() {
  return GetFreePort()
		// 省略中间一段设置主端口的代码
    .then(port => {
    if (this.options.sticky) {
      this.options.stickyWorkerPort = port;
    }
  })
}
复制代码

同时,会启动一个内部的 net.Server,用来做消息转发给Worker

if (this.options.sticky) {
  this.startMasterSocketServer(err => {
 		// 省略
  });
}

startMasterSocketServer(cb) {
  
  // 内部 net server
  require('net').createServer({
    pauseOnConnect: true,
  }, connection => {
     // 这段涉及到 TCP_reset_attack,有兴趣可以自查,不介绍
    if (!connection.remoteAddress) {
      connection.destroy();
    } else {
      // 选出一个worker
      const worker = this.stickyWorker(connection.remoteAddress);
      worker.send('sticky-session:connection', connection);
    }
  }).listen(this[REAL_PORT], cb);
}
复制代码

:::info 题外话:为什么这里 listen 不会报端口重复监听?我的理解是,按照这篇文章的介绍,Master 在初次给 Worker 传递 Socket 的时候,才会去启动内部 TCP 服务,比 startMasterSocketServer要晚。 :::

在 worker 当中,如果是有 配置 sticky,就会使用该 stickyWorkerPort 端口进行监听,同时只监听 父进程(也就是master)转发过来的 sticky-session:connection消息

if (options.sticky) {
  server.listen(options.stickyWorkerPort, '127.0.0.1');
  
  process.on('message', (message, connection) => {
    if (message !== 'sticky-session:connection') {
      return;
    }

    server.emit('connection', connection);
    connection.resume();
  });
}
// 省略正常监听的代码
复制代码

这其中有个细节,那如何保证转发的过程中,数据不丢失呢?

:::info 为什么会丢失?因为 net.Socket 是个 Duplex Stream 对象,在 Flowing Mode 下面会自动读取数据,如果 不响应 data 事件,数据就丢了) :::

首先在创建 socket 的时候,开启 pauseOnConnect 选项;

If pauseOnConnect is set to true, then the socket associated with each incoming connection will be paused, and no data will be read from its handle.

其次在接受到 socket 的时候,恢复执行 resume

转发策略

转发是通过 stickyWorker 函数实现的,本质上就是把 remoteAddress 对 Worker 数量取余数,作为索引去 Worker 列表里随机取一个 Worker

stickyWorker(ip) {
    const workerNumbers = this.options.workers;
    // ws是一组pid列表
    const ws = this.workerManager.listWorkerIds();

    let s = '';
    // IP处理:127.0.0.1 -> 127001
    for (let i = 0; i < ip.length; i++) {
      
      // 这个判断可以过滤掉字母和符号,这样就可以兼容IPv4和IPv6
      if (!isNaN(ip[i])) {
        s += ip[i];
      }
    }
    s = Number(s);
    // 取余数
    const pid = ws[s % workerNumbers];
    return this.workerManager.getWorker(pid);
  }

复制代码

背景详见issue:Sticky Mode 引发的问题

其他有趣的小发现

agent/worker 启动方式差异

回去翻代码,可以发现 agent 是 child_process.fork 方法启动的,worker 是通过 cluster 启动的。 猜测可能是如果需要提供一些类似管理页面的本地服务,一般是 agent做,因此agent 要有独立监听端口的能力

如何检测进程真的退出

对进程强制执行process.exit并且用try-catch包裹住,如果有报错,说明是真的退出了。

使用场景:一开始使用 SIGTERM 要求退出(对比 kill -1),后来退出超过一定时间了,使用 SIGKILL 强制退出(类似 kill -9)

function getUnterminatedProcesses(pids) {
  return pids.filter(pid => {
    try {
      // success means it's still alive
      process.kill(pid, 0);
      return true;
    } catch (err) {
      // error means it's dead
      return false;
    }
  });
}
复制代码

小结

EggCluster涉及知识体系大图

image.png

关注下面的标签,发现更多相似文章
评论