阅读 300

pika主从复制原理之工作流程

上一篇pika主从复制原理之binlog中介绍了主从复制binlog的元信息、日志的格式及对应的api,本篇介绍下主从复制有关的线程、全量复制过程、增量复制过程。

线程

PikaBinlogReceiverThread: 系统启动时初始化,占用端口port+1000,作为Slave接收Master同步过来的Redis命令。

PikaHeartbeatThread: 系统启动时初始化,占用端口port+2000,作为Master接收Slave发送的ping、spci指令,对所有Slave进行存活检测

PikaTrysyncThread: 系统启动时初始化,无角色,执行slaveof host port命令对应的后台任务包括定期检查是否需要跟某个Master建立连接、db替换、启动或关闭rsync任务等,当启动rsync服务时,占用端口port+3000。

PikaSlavepingThread: 成为某个Master的Slave后,作为Slave的一方初始化,定时向Master发送ping、spci指令,如果超时超过30秒,生成以后关闭Master的后台任务,由PikaBinlogReceiverThread来执行。

BinlogBGWorker: 系统启动时初始化,每个BinlogBGWorker包含一个binlogbg_thread,作为Slave的后台执行模块,接收PikaBinlogReceiverThread的调度、执行具体的redis命令。

PikaBinlogSenderThread: 成为某个Slave的Master后,作为Master的一方初始化,根据slave传过来的filenum、offset消费日志,并发送到PikaBinlogReceiverThread所在的服务端口。
复制代码

类图

  • PikaBinlogReceiverThread

  • PikaBinlogSenderThread

主要API描述

  • Thread
//初始化

int Thread::InitHandle()

//执行线程内的定时任务

void Thread::CronHandle()

//创建线程,调用RunThread()

int Thread::StartThread()

//处理逻辑入口函数

void *Thread::RunThread(void *arg)
复制代码
  • HolyThread
virtual int InitHandle()

1.初始化epoll监听事件,并将套接字加入server_fds

virtual void *ThreadMain()

1.调用CronHandle,检测是否存在需要执行的定时任务,如果有则执行。

2.如果server_fds上存在EPOLLIN事件,则接受连接请求,建立slave-master连接

3.如果是slave-master连接上存在EPOLLIN事件,则调用in_conn->GetRequest()进行具体redis命令处理。

4.如果是slave-master连接上存在EPOLLOUT事件,则调用in_conn->SendReply()发送应答。
复制代码
  • PikaBinlogReceiverThread
//执行后台任务,PikaSlavepingThread检测到应该退出或者主挂掉时,添加Kill PikaBinlogSenderThread后台任务。

void PikaBinlogReceiverThread::CronHandle()
复制代码
  • PikaMasterConn
int PikaMasterConn::DealMessage()

1.如果存在monitor,则将当前命令发到monitor一份(即redis的monitor命令)。

2.生成全局日志序号serial,保证按顺序并发写入slave binlog。

3.如果是只读,则由PikaBinlogReceiverThread完成binlog日志的追加,否则由PikaBinlogReceiverThread分配的BinlogBGWorker完成。
复制代码
  • BinlogBGWorker
void Schedule()

1.第一次调度时启动后台线程。

2.通过DoBinlogBG执行具体的后台任务。

void BinlogBGWorker::DoBinlogBG(void* arg)

1.根据解析的参数获取需要执行的命令。

2.如果slave非只读,则获取一个全局记录锁(应该是用不到的)。

3.记录binlog。

4.执行命令。

5.记录慢日志。
复制代码

Slaveof流程图

  • slave

  • master

  • slaveof过程
1.如果是slaveof no one,停止rsync服务,删除master,repl_state_ = PIKA_REPL_NO_CONNECT,role=master

2.如果slaveof ip port filenum offset,则将filenum之前的binlog删除,同时如果filenum存在,则将offset之前的文件内容填充空格。

3.role更新为slave,repl_state更新为PIKA_REPL_CONNECT,应答成功。

//后台任务部分

4.PikaTrysyncThread检测到repl_state==PIKA_REPL_CONNECT,在端口port+3000启动rsync服务,准备接受master的db内容。

5.slave建立与master的连接,如果有auth,则主动发送auth命令。

6.发送trysync命令,主动要求master进行数据同步。

7.如果master应答结果为kInnerReplWait,则repl_state_ = PIKA_REPL_WAIT_DBSYNC。

8.slave会一直等待,直到db_sync_path目录下存在info文件时,用新的db替换之前的db,根据info中的filenum、offset更新slave对应的filenum、offset,重置repl_state_ = PIKA_REPL_CONNECT

9.再次执行3-6步骤,master应答kInnerReplOk,更新repl_state_ = PIKA_REPL_CONNECTING,停掉rsync服务,创建PikaSlavepingThread,在PikaSlavepingThread ping master成功以后更新repl_state_ = PIKA_REPL_CONNECTED,主从同步正式建立。
复制代码
  • trysync过程
1.根据slave的ip、port构造slave的唯一标识,stage=SLAVE_ITEM_STAGE_ONE。

2.如果master没有找到slave传过来的filenum,则执行bgsave,生成一份db镜像和info信息,通过rsync传给slave,并清理掉master的临时文件

3.根据slave的ip、port,bgsave的filenum、offset构建PikaBinlogSenderThread

4.过滤binlog中可能已经损坏的内容(什么场景可能会用到?可能主从数据不一致吗?)

5.更新role_ = PIKA_ROLE_MASTER,PikaSlavepingThread第二次存活检测时发送spci命令,主根据PikaBinlogSenderThread,更新stage=SLAVE_ITEM_STAGE_TWO,主从同步正式建立。
复制代码

结语

个人认为主从复制是pika里面体量最大也是最复杂的一个模块,通过采用了类似LevelDB的WAL日志的方式,当然由于redis命令的问题,这个日志非幂等的,不是为了在启动时重放,而是解决了redis中增量复制buffer被打满的问题。(基于2.x版本,目前pika已经是3.x版本,主从复制有所变化,先迁移到此处,后续会更新)