Redis Server 连接管理

970 阅读7分钟

连接初始化

接收连接

Redis 服务支持以下几种连接方式:

  • TCP
  • TLS
  • Unix Socket

收到不同方式的连接请求之后,Redis 会调用启动时绑定的处理函数。TCP 连接使用 acceptTcpHandler 处理,TLS 连接使用 acceptTLSHandler 处理,Unix socket 连接使用 acceptUnixHandler 处理。这几个处理函数最终都会调用通用函数 acceptCommonHandler

acceptCommonHandler 步骤

  1. 检查连接数,如果超过了配置的最大连接数会拒绝请求,并向客户端发送错误信息。最大连接数可以在 redis.conf 的 maxclients 参数修改。
  2. 创建客户端对象
  3. 保存 flag 参数,主要用于之后区分使用 Unix Socket 连接的客户端
  4. 调用客户端连接请求处理函数 clientAcceptHandler,主要用于处理默认保护模式下,拒绝处理没有设置密码的外部设备(非 localhost)连接请求。
// networking.c
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    client *c;
    UNUSED(ip);

    // 1. 连接数检查
    if (listLength(server.clients) >= server.maxclients) {
        // 错误处理
				// ...
        return;
    }

		// 2. 创建客户端对象
    if ((c = createClient(conn)) == NULL) {
        // 错误处理
				// ...				
				return;
    }

		// 3. 保存接收连接的相关参数
    c->flags |= flags;

    // 4. 调用 clientAcceptHandler 处理客户端连接请求
    if (connAccept(conn, clientAcceptHandler) == C_ERR) {
        // 错误处理
				// ...
        return;
    }
}

创建客户端对象

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));

		// 连接初始化
    if (conn) {
				// 1. 将连接设为非阻塞模式
        connNonBlock(conn);
				// 2. 禁用 nagel 算法
        connEnableTcpNoDelay(conn);
				// 3. 设置 TCP keepalive
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);

				// 4. 设置请求处理函数
        connSetReadHandler(conn, readQueryFromClient);
				// 5. 让 conn->private_data 指向 client 对象
        connSetPrivateData(conn, c);
    }
		...

步骤:

  1. 将连接设为非阻塞模式。

    https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/59e60246228f4a4db9fd67bc3cbaf2e7~tplv-k3u1fbpfcp-zoom-1.image

    如果程序 A 不断调用 send 将数据将数据拷贝到内核缓冲区,而应用程序 B 不调用 recv ,则 B 的内核缓冲区被填满后 A 的内核缓冲也会被填满,此时 A 继续调用 send 函数结果与 socket 模式有关

    • 阻塞模式:继续调用 send/recv 时会阻塞在调用处
    • 非阻塞模式:立即出错并退出,得到错误码 EWOULDBLOCK 或 EAGAIN
  2. 设置 TCP_NODELAY,禁用 nagle 算法,存放到内核缓冲区中的数据会立即发出。否则如果一次放到内核缓冲区中的数据数据包太小,则会在多个小的数据包凑成一个足够大的数据包后才会将数据发出。

  3. 设置 TCP keepalive,作用如下:

    1. 检测因服务停止、网络波动、宕机、应用重启等原因挂掉的连接
    2. 防止因为网络不活动而断连(使用NAT代理或者防火墙的时候,经常会出现这种问题)
    3. TCP层面的心跳检测
  4. 将请求处理函数设为 readQueryFromClient ,用于解析和处理客户端发来的请求命令。

  5. conn->private_data 指向 client 对象,使 client 对象与 conn 对象相互引用

		// 初始化 client 属性
    // ...
		// 

		// 6. 保存 client 对象
		if (conn) linkClient(c);
		// 7. 初始化 MULTI/EXEC 相关的参数
    initClientMultiState(c);
    return c;
}
  1. linkClient 保存 client 对象

    1. 将 client 对象存到双向链表 server.clients 尾部的节点
    2. server.clients 尾部节点保存到 client 对象的 client_list_node 字段
    3. 反转 client id 字节序,将转换后的 id 作为 key,client 对象作为 value,保存到基数树 server.clients_index。当后续需要通过 id 获取 client 对象时会(例如 CLIENT UNBLOCK 命令)从基数树中查询。

    反转 client id 字节序时使用 memrev64 函数,先将 64 位的 unsigned int 转换成 char*,然后在 char* 内部交换字符的顺序:

    void memrev64(void *p) {
        unsigned char *x = p, t;
    
        t = x[0];
        x[0] = x[7];
        x[7] = t;
        t = x[1];
        x[1] = x[6];
        x[6] = t;
        t = x[2];
        x[2] = x[5];
        x[5] = t;
        t = x[3];
        x[3] = x[4];
        x[4] = t;
    }
    

    对于长整型数据的映射,利用基数树可以根据一个长整型快速查找到其对应的对象指针。避免了使用 hash 映射 hash 函数难以设计,不恰当的 hash 函数可能增大冲突,或浪费空间。

    https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/04cb1ab60dbd41deb14c43e4c6ae4553~tplv-k3u1fbpfcp-zoom-1.image

基数树可视化工具:www.cs.usfca.edu/~galles/vis…

执行 createClient 命令时支持传入 NULL,是因为 redis 中所有命令的执行都依赖一个 client 上下文,但是在 Lua 解释器中执行脚本等情况下并没有活跃的连接,因此需要用到 conn 为 NULL 的 client。

conn 为 NULL 的 client 不会被添加到 server.clientsserver.clients_index

事件处理

读事件

当一个新的客户端连接到服务器时, 服务器会给为该客户端绑定读事件, 直到客户端断开连接之后, 这个读事件才会被移除。

  • 当客户端只是连接到服务器,但并没有向服务器发送命令时,读事件就处于等待状态
  • 当客户端给服务器发送命令请求,并且请求已到达时,该客户端的读事件处于就绪状态

写事件

当服务器有命令结果要传回给客户端时, 会为客户端关联写事件, 在命令结果传送完毕之后, 客户端和写事件的关联就会被移除。

  • 当服务器有命令结果需要返回给客户端,但客户端还未能执行无阻塞写,那么写事件处于等待状态
  • 当服务器有命令结果需要返回给客户端,并且客户端可以进行无阻塞写,那么写事件处于就绪状态

当出现读事件和写事件同时就绪的情况时, 优先处理读事件

client 对象与事件循环

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/138e697f0c6b429d9b315ae6f1ff1fa3~tplv-k3u1fbpfcp-zoom-1.image

Redis server 启动时,会在全局对象 aeEventLoop 中使用 eventsfired 两个字段保存了事件相关的对象:

  • events:保存注册的事件
  • fired:保存触发的事件

另外当事件的到来,就将所有就绪的事件从内核事件表中复制到 apidata→events

events, fired, apidata→events 数组的大小相同,下标是 clientfd

当 clientfd 的读事件触发后,redis server 执行 connSocketSetReadHandler 函数,然后触发 aeCreateFileEvent 函数

// 读事件调用 aeCreateFileEvent 时参数依次为 
// * 全局 eventLoop 对象
// * client fd
// * AE_READABLE
// * connSocketEventHandler 函数
// * connection 对象
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    // ...

		// 读取 redis server 初始化时预留的空 aeFileEvent
    aeFileEvent *fe = &eventLoop->events[fd];

		// aeApiAddEvent 会根据 OS 分别调用 select, epoll, kqueue, evport 4 种实现
		// 其中会生成一个 epoll_event 对象 ee,将 EPOLLIN 添加到 ee.events,并将 clientfd 
		// 添加到 ee.data.fd
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
		// 设置读事件的回调函数
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

epoll 的 aeApiAddEvent 实现如下:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
		
		// 判断是新增还是修改
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;

		// 保留原有的 mask
    mask |= eventLoop->events[fd].mask;

    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;

		// 将 clientfd 添加到 epoll_event 的 data
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

事件循环中aeApiPoll 调用 epoll_waitepoll_wait 将触发的事件复制到 apidate->events ,然后由 aeApiPoll 中的逻辑将本次触发事件的序号作为数组下标,将 fd、事件掩码记录到 eventLoop->fired 数组对应的位置上。

aeProcessEvents 在 aeApiPoll 返回后遍历 eventLoop->fired 数组,取出有效的数组元素,得到有事件的 fd 和事件掩码 mask

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    // tvp 是后续定时任务允许等待的最大事件

		aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

		// epoll_wait 将所有就绪的事件从内核事件表中复制到 state->events 
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

    if (retval > 0) {
        int j;

        numevents = retval;
				// 将触发的事件复制到 eventLoop->fired
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0fd48f5b01d64dc5be888360ae7dfdf9~tplv-k3u1fbpfcp-zoom-1.image

clientfd 为下标从 eventLoop->events 中取出 aeFileEvent 对象,然后通过 aeFileEvent 的 clientData 取出 connection 对象,进而通过 connection 对象的 private_data 得到 client 对象,用于后续处理。