连接初始化
接收连接
Redis 服务支持以下几种连接方式:
- TCP
- TLS
- Unix Socket
收到不同方式的连接请求之后,Redis 会调用启动时绑定的处理函数。TCP 连接使用 acceptTcpHandler
处理,TLS 连接使用 acceptTLSHandler
处理,Unix socket 连接使用 acceptUnixHandler
处理。这几个处理函数最终都会调用通用函数 acceptCommonHandler
acceptCommonHandler
步骤
- 检查连接数,如果超过了配置的最大连接数会拒绝请求,并向客户端发送错误信息。最大连接数可以在 redis.conf 的
maxclients
参数修改。 - 创建客户端对象
- 保存 flag 参数,主要用于之后区分使用 Unix Socket 连接的客户端
- 调用客户端连接请求处理函数
clientAcceptHandler
,主要用于处理默认保护模式下,拒绝处理没有设置密码的外部设备(非 localhost)连接请求。
// networking.c
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
client *c;
UNUSED(ip);
// 1. 连接数检查
if (listLength(server.clients) >= server.maxclients) {
// 错误处理
// ...
return;
}
// 2. 创建客户端对象
if ((c = createClient(conn)) == NULL) {
// 错误处理
// ...
return;
}
// 3. 保存接收连接的相关参数
c->flags |= flags;
// 4. 调用 clientAcceptHandler 处理客户端连接请求
if (connAccept(conn, clientAcceptHandler) == C_ERR) {
// 错误处理
// ...
return;
}
}
创建客户端对象
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
// 连接初始化
if (conn) {
// 1. 将连接设为非阻塞模式
connNonBlock(conn);
// 2. 禁用 nagel 算法
connEnableTcpNoDelay(conn);
// 3. 设置 TCP keepalive
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
// 4. 设置请求处理函数
connSetReadHandler(conn, readQueryFromClient);
// 5. 让 conn->private_data 指向 client 对象
connSetPrivateData(conn, c);
}
...
步骤:
-
将连接设为非阻塞模式。
如果程序 A 不断调用
send
将数据将数据拷贝到内核缓冲区,而应用程序 B 不调用recv
,则 B 的内核缓冲区被填满后 A 的内核缓冲也会被填满,此时 A 继续调用send
函数结果与 socket 模式有关- 阻塞模式:继续调用
send/recv
时会阻塞在调用处 - 非阻塞模式:立即出错并退出,得到错误码 EWOULDBLOCK 或 EAGAIN
- 阻塞模式:继续调用
-
设置 TCP_NODELAY,禁用 nagle 算法,存放到内核缓冲区中的数据会立即发出。否则如果一次放到内核缓冲区中的数据数据包太小,则会在多个小的数据包凑成一个足够大的数据包后才会将数据发出。
-
设置 TCP keepalive,作用如下:
- 检测因服务停止、网络波动、宕机、应用重启等原因挂掉的连接
- 防止因为网络不活动而断连(使用NAT代理或者防火墙的时候,经常会出现这种问题)
- TCP层面的心跳检测
-
将请求处理函数设为
readQueryFromClient
,用于解析和处理客户端发来的请求命令。 -
让
conn->private_data
指向 client 对象,使 client 对象与 conn 对象相互引用
// 初始化 client 属性
// ...
//
// 6. 保存 client 对象
if (conn) linkClient(c);
// 7. 初始化 MULTI/EXEC 相关的参数
initClientMultiState(c);
return c;
}
-
linkClient
保存 client 对象- 将 client 对象存到双向链表
server.clients
尾部的节点 - 将
server.clients
尾部节点保存到 client 对象的client_list_node
字段 - 反转 client id 字节序,将转换后的 id 作为 key,client 对象作为 value,保存到基数树
server.clients_index
。当后续需要通过 id 获取 client 对象时会(例如CLIENT UNBLOCK
命令)从基数树中查询。
反转 client id 字节序时使用
memrev64
函数,先将 64 位的 unsigned int 转换成 char*,然后在 char* 内部交换字符的顺序:void memrev64(void *p) { unsigned char *x = p, t; t = x[0]; x[0] = x[7]; x[7] = t; t = x[1]; x[1] = x[6]; x[6] = t; t = x[2]; x[2] = x[5]; x[5] = t; t = x[3]; x[3] = x[4]; x[4] = t; }
对于长整型数据的映射,利用基数树可以根据一个长整型快速查找到其对应的对象指针。避免了使用 hash 映射 hash 函数难以设计,不恰当的 hash 函数可能增大冲突,或浪费空间。
- 将 client 对象存到双向链表
基数树可视化工具:www.cs.usfca.edu/~galles/vis…
执行 createClient
命令时支持传入 NULL,是因为 redis 中所有命令的执行都依赖一个 client 上下文,但是在 Lua 解释器中执行脚本等情况下并没有活跃的连接,因此需要用到 conn 为 NULL 的 client。
conn 为 NULL 的 client 不会被添加到 server.clients
和 server.clients_index
。
事件处理
读事件
当一个新的客户端连接到服务器时, 服务器会给为该客户端绑定读事件, 直到客户端断开连接之后, 这个读事件才会被移除。
- 当客户端只是连接到服务器,但并没有向服务器发送命令时,读事件就处于等待状态。
- 当客户端给服务器发送命令请求,并且请求已到达时,该客户端的读事件处于就绪状态
写事件
当服务器有命令结果要传回给客户端时, 会为客户端关联写事件, 在命令结果传送完毕之后, 客户端和写事件的关联就会被移除。
- 当服务器有命令结果需要返回给客户端,但客户端还未能执行无阻塞写,那么写事件处于等待状态。
- 当服务器有命令结果需要返回给客户端,并且客户端可以进行无阻塞写,那么写事件处于就绪状态。
当出现读事件和写事件同时就绪的情况时, 优先处理读事件
client 对象与事件循环
Redis server 启动时,会在全局对象 aeEventLoop 中使用 events
和 fired
两个字段保存了事件相关的对象:
- events:保存注册的事件
- fired:保存触发的事件
另外当事件的到来,就将所有就绪的事件从内核事件表中复制到 apidata→events
events
,fired
,apidata→events
数组的大小相同,下标是 clientfd
当 clientfd 的读事件触发后,redis server 执行 connSocketSetReadHandler
函数,然后触发 aeCreateFileEvent
函数
// 读事件调用 aeCreateFileEvent 时参数依次为
// * 全局 eventLoop 对象
// * client fd
// * AE_READABLE
// * connSocketEventHandler 函数
// * connection 对象
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
// ...
// 读取 redis server 初始化时预留的空 aeFileEvent
aeFileEvent *fe = &eventLoop->events[fd];
// aeApiAddEvent 会根据 OS 分别调用 select, epoll, kqueue, evport 4 种实现
// 其中会生成一个 epoll_event 对象 ee,将 EPOLLIN 添加到 ee.events,并将 clientfd
// 添加到 ee.data.fd
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
// 设置读事件的回调函数
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
epoll 的 aeApiAddEvent 实现如下:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
// 判断是新增还是修改
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
// 保留原有的 mask
mask |= eventLoop->events[fd].mask;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
// 将 clientfd 添加到 epoll_event 的 data
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
事件循环中aeApiPoll
调用 epoll_wait
, epoll_wait
将触发的事件复制到 apidate->events
,然后由 aeApiPoll
中的逻辑将本次触发事件的序号作为数组下标,将 fd、事件掩码记录到 eventLoop->fired
数组对应的位置上。
aeProcessEvents 在 aeApiPoll 返回后遍历 eventLoop->fired
数组,取出有效的数组元素,得到有事件的 fd 和事件掩码 mask
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
// tvp 是后续定时任务允许等待的最大事件
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// epoll_wait 将所有就绪的事件从内核事件表中复制到 state->events
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
// 将触发的事件复制到 eventLoop->fired
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
clientfd 为下标从 eventLoop->events 中取出 aeFileEvent 对象,然后通过 aeFileEvent 的 clientData 取出 connection 对象,进而通过 connection 对象的 private_data 得到 client 对象,用于后续处理。