网络编程 socket programming 学习笔记 (二)高并发

695 阅读15分钟

这篇文章开始,我们会主要讨论如何设计高并发、高性能的网络服务器程序。 通过这部分,期望掌握多路复用、异步I/O、多线程等知识,从而可以实现并发 C10K 以上的高性能网络服务。

I/O 模式

对于一次I/O访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:

  1. 等待数据准备 (Waiting for the data to be ready)
  2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正式因为这两个阶段,linux系统产生了下面五种网络模式的方案。

  • 阻塞 I/O(blocking IO)
  • 非阻塞 I/O(nonblocking IO)
  • I/O 多路复用( IO multiplexing)
  • 信号驱动 I/O( signal driven IO)
  • 异步 I/O(asynchronous IO)

注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

Blocking I/O 阻塞 I/O

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

图片来自 java Selector is asynchronous or non-blocking architecture

当用户进程调用了recvfrom这个系统调用,kernel就开始了I/O的第一个阶段:准备数据(对于网络I/O来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

Non-Blocking I/O

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

I/O Multiplexing I/O 多路复用

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

我们可以把标准输入、套接字等都看做 I/O 的一路,多路复用的意思,就是在任何一路 I/O 有“事件”发生的情况下,通知应用程序去处理相应的 I/O 事件,这样我们的程序就变成了“多面手”,在同一时刻仿佛可以处理多个 I/O 事件。

select 函数就是这样一种常见的 I/O 多路复用技术,我们将在后面继续讲解其他的多路复用技术。使用 select 函数,通知内核挂起进程,当一个或多个 I/O 事件发生后,控制权返还给应用程序,由应用程序进行 I/O 事件的处理。

这些 I/O 事件的类型非常多,比如:

  • 标准输入文件描述符准备好可以读。
  • 监听套接字准备好,新的连接已经建立成功。
  • 已连接套接字准备好可以写。
  • 如果一个 I/O 事件等待超过了 10 秒,发生了超时事件。

Asynchronous I/O

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

著名的 Node.js 依赖的 Libuv 就是一个跨平台的异步I/O 库。

区别

blocking和non-blocking的区别

当应用程序调用阻塞 I/O 完成某个操作时,应用程序会被挂起,等待内核完成操作,感觉上应用程序像是被“阻塞”了一样。实际上,内核所做的事情是将 CPU 时间切换给其他有需要的进程,网络应用程序在这种情况下就会得不到 CPU 时间做该做的事情。

非阻塞 I/O 则不然,当应用程序调用非阻塞 I/O 完成某个操作时,内核立即返回,不会把 CPU 时间切换给其他进程,应用程序在返回后,可以得到足够的 CPU 时间继续完成其他事情。

  1. read 总是在接收缓冲区有数据时就立即返回,不是等到应用程序给定的数据充满才返回。当接收缓冲区为空时,阻塞模式会等待,非阻塞模式立即返回 -1,并有 EWOULDBLOCK 或 EAGAIN 错误。

  2. 和 read 不同,阻塞模式下,write 只有在发送缓冲区足以容纳应用程序的输出字节时才返回;而非阻塞模式下,则是能写入多少就写入多少,并返回实际写入的字节数。

  3. 阻塞模式下的 write 有个特例, 就是对方主动关闭了套接字,这个时候 write 调用会立即返回,并通过返回值告诉应用程序实际写入的字节数,如果再次对这样的套接字进行 write 操作,就会返回失败。失败是通过返回值 -1 来通知到应用程序的。

调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。

synchronous I/O 和 asynchronous I/O 的区别

在说明synchronous I/O 和 asynchronous I/O的区别之前,需要先给出两者的定义。POSIX的定义是这样子的:

  • A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
  • An asynchronous I/O operation does not cause the requesting process to be blocked;

两者的区别就在于synchronous I/O做 "I/O operation" 的时候会将process阻塞。按照这个定义,之前所述的blocking I/O,non-blocking I/O,I/O multiplexing都属于synchronous I/O。

有人会说,non-blocking I/O并没有被block啊。这里有个非常"狡猾"的地方,定义中所指的"I/O operation"是指真实的I/O操作,就是例子中的recvfrom这个system call。non-blocking I/O在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。

而asynchronous I/O则不一样,当进程发起I/O 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说I/O完成。在这整个过程中,进程完全没有被block。

举个例子


#define MAX_LINE 1024
#define FD_INIT_SIZE 128

char rot13_char(char c) {
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

//数据缓冲区
struct Buffer {
    int connect_fd;  //连接字
    char buffer[MAX_LINE];  //实际缓冲
    size_t writeIndex;      //缓冲写入位置
    size_t readIndex;       //缓冲读取位置
    int readable;           //是否可以读
};

struct Buffer *alloc_Buffer() {
    struct Buffer *buffer = malloc(sizeof(struct Buffer));
    if (!buffer)
        return NULL;
    buffer->connect_fd = 0;
    buffer->writeIndex = buffer->readIndex = buffer->readable = 0;
    return buffer;
}

void free_Buffer(struct Buffer *buffer) {
    free(buffer);
}

int onSocketRead(int fd, struct Buffer *buffer) {
    char buf[1024];
    int i;
    ssize_t result;
    while (1) {
        result = recv(fd, buf, sizeof(buf), 0);
        if (result <= 0)
            break;

        for (i = 0; i < result; ++i) {
            if (buffer->writeIndex < sizeof(buffer->buffer))
                buffer->buffer[buffer->writeIndex++] = rot13_char(buf[i]);
            if (buf[i] == '\n') {
                buffer->readable = 1;  //缓冲区可以读
            }
        }
    }

    if (result == 0) {
        return 1;
    } else if (result < 0) {
        if (errno == EAGAIN)
            return 0;
        return -1;
    }

    return 0;
}

int onSocketWrite(int fd, struct Buffer *buffer) {
    while (buffer->readIndex < buffer->writeIndex) {
        ssize_t result = send(fd, buffer->buffer + buffer->readIndex, buffer->writeIndex - buffer->readIndex, 0);
        if (result < 0) {
            if (errno == EAGAIN)
                return 0;
            return -1;
        }

        buffer->readIndex += result;
    }

    if (buffer->readIndex == buffer->writeIndex)
        buffer->readIndex = buffer->writeIndex = 0;

    buffer->readable = 0;

    return 0;
}

int main(int argc, char **argv) {
    int listen_fd;
    int i, maxfd;

    struct Buffer *buffer[FD_INIT_SIZE];
    for (i = 0; i < FD_INIT_SIZE; ++i) {
        buffer[i] = alloc_Buffer();
    }

    // 底层实现  fcntl(fd, F_SETFL, O_NONBLOCK);  
    // 调用 fcntl 将监听套接字设置为非阻塞
    listen_fd = tcp_nonblocking_server_listen(SERV_PORT);

    fd_set readset, writeset, exset;
    FD_ZERO(&readset);
    FD_ZERO(&writeset);
    FD_ZERO(&exset);

    while (1) {
        maxfd = listen_fd;

        FD_ZERO(&readset);
        FD_ZERO(&writeset);
        FD_ZERO(&exset);

        // listener加入readset
        FD_SET(listen_fd, &readset);

        for (i = 0; i < FD_INIT_SIZE; ++i) {
            if (buffer[i]->connect_fd > 0) {
                if (buffer[i]->connect_fd > maxfd)
                    maxfd = buffer[i]->connect_fd;
                FD_SET(buffer[i]->connect_fd, &readset);
                if (buffer[i]->readable) {
                    FD_SET(buffer[i]->connect_fd, &writeset);
                }
            }
        }

        if (select(maxfd + 1, &readset, &writeset, &exset, NULL) < 0) {
            error(1, errno, "select error");
        }

        if (FD_ISSET(listen_fd, &readset)) {
            printf("listening socket readable\n");
            sleep(5);
            struct sockaddr_storage ss;
            socklen_t slen = sizeof(ss);
           int fd = accept(listen_fd, (struct sockaddr *) &ss, &slen);
            if (fd < 0) {
                error(1, errno, "accept failed");
            } else if (fd > FD_INIT_SIZE) {
                error(1, 0, "too many connections");
                close(fd);
            } else {
                // 把连接套接字设置为非阻塞
                make_nonblocking(fd);
                if (buffer[fd]->connect_fd == 0) {
                    buffer[fd]->connect_fd = fd;
                } else {
                    error(1, 0, "too many connections");
                }
            }
        }

        for (i = 0; i < maxfd + 1; ++i) {
            int r = 0;
            if (i == listen_fd)
                continue;

            if (FD_ISSET(i, &readset)) {
                r = onSocketRead(i, buffer[i]);
            }
            if (r == 0 && FD_ISSET(i, &writeset)) {
                r = onSocketWrite(i, buffer[i]);
            }
            if (r) {
                buffer[i]->connect_fd = 0;
                close(i);
            }
        }
    }
}

select 函数


int select(int maxfd, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);

// 返回:若有就绪描述符则为其数目,若超时则为0,若出错则为-1


void FD_ZERO(fd_set *fdset);&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;
void FD_SET(int fd, fd_set *fdset);&emsp;&emsp;
void FD_CLR(int fd, fd_set *fdset);&emsp;&emsp;&emsp;
int  FD_ISSET(int fd, fd_set *fdset);

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但 是这样也会造成效率的降低。

举个例子


int main(int argc, char **argv) {
    if (argc != 2) {
        error(1, 0, "usage: select01 <IPaddress>");
    }
    int socket_fd = tcp_client(argv[1], SERV_PORT);

    char recv_line[MAXLINE], send_line[MAXLINE];
    int n;

    fd_set readmask;
    fd_set allreads;
    // 把所有元素设置成 0
    FD_ZERO(&allreads);
    // 设置allreads[0] 为 1
    FD_SET(0, &allreads);
    // 设置allreads[3] 为 1 
    // 这里面in,out,error 是3个标准接口,分别占用了0到2这几个位置,
    // 所以socket_fd从 3 开始。
    FD_SET(socket_fd, &allreads);

    for (;;) {
        // 重置 回 0 和 3 
        readmask = allreads;
        // 让系统检测 0 和 3 
        int rc = select(socket_fd + 1, &readmask, NULL, NULL, NULL);

        if (rc <= 0) {
            error(1, errno, "select failed");
        }

        // 如果 网络 sockct 存在数据
        if (FD_ISSET(socket_fd, &readmask)) {
            n = read(socket_fd, recv_line, MAXLINE);
            if (n < 0) {
                error(1, errno, "read error");
            } else if (n == 0) {
                error(1, 0, "server terminated \n");
            }
            recv_line[n] = 0;
            fputs(recv_line, stdout);
            fputs("\n", stdout);
        }


        // 如果 STD IN 存在数据
        if (FD_ISSET(STDIN_FILENO, &readmask)) {
            if (fgets(send_line, MAXLINE, stdin) != NULL) {
                int i = strlen(send_line);
                if (send_line[i - 1] == '\n') {
                    send_line[i - 1] = 0;
                }

                printf("now sending %s\n", send_line);
                size_t rt = write(socket_fd, send_line, strlen(send_line));
                if (rt < 0) {
                    error(1, errno, "write failed ");
                }
                printf("send bytes: %zu \n", rt);
            }
        }
    }
}

初始化后设置好 1 和 3 的数据状态

当 stdin 可读的时候

select 返回的时候相当于内核通知我们套接字有数据可以读了,使用 read 函数不会阻塞, write 也同理。

poll

上文介绍了select 方法是多个 UNIX 平台支持的非常常见的 I/O 多路复用技术,它通过描述符集合来表示检测的 I/O 对象,通过三个不同的描述符集合来描述 I/O 事件 :可读、可写和异常。但是 select 有一个缺点,那就是所支持的文件描述符的个数是有限的。在 Linux 系统中,select 的默认最大值为 1024。

是否有别的 I/O 多路复用技术可以突破文件描述符个数限制呢?接下来介绍一下 poll 函数。


int poll(struct pollfd *fds, unsigned long nfds, int timeout); 

// 返回值:若有就绪描述符则为其数目,若超时则为0,若出错则为-1


struct pollfd {
    int    fd;       /* file descriptor */
    short  events;   /* events to look for */
    short  revents;  /* events returned */
 };
 
 
#define    POLLIN    0x0001    /* any readable data available */
#define    POLLPRI   0x0002    /* OOB/Urgent readable data */
#define    POLLOUT   0x0004    /* file descriptor is writeable */

和 select 函数对比一下,我们发现 poll 函数和 select 不一样的地方就是,在 select 里面,文件描述符的个数已经随着 fd_set 的实现而固定,没有办法对此进行配置;而在 poll 函数里,我们可以控制 pollfd 结构的数组大小,这意味着我们可以突破原来 select 函数最大描述符的限制,在这种情况下,应用程序调用者需要分配 pollfd 数组并通知 poll 函数该数组的大小。

举个例子


#define INIT_SIZE 128

int main(int argc, char **argv) {
    int listen_fd, connected_fd;
    int ready_number;
    ssize_t n;
    char buf[MAXLINE];
    struct sockaddr_in client_addr;

    listen_fd = tcp_server_listen(SERV_PORT);

    // 初始化pollfd数组,这个数组的第一个元素是listen_fd,其余的用来记录将要连接的connect_fd
    struct pollfd event_set[INIT_SIZE];
    // 期望系统内核监听套接字上的连接建立完成事件
    event_set[0].fd = listen_fd;
    event_set[0].events = POLLRDNORM;

    // 用-1表示这个数组位置还没有被占用
    int i;
    for (i = 1; i < INIT_SIZE; i++) {
        event_set[i].fd = -1;
    }

    for (;;) {
        // 第三个参数 timeout 设置为 -1,表示在 I/O 事件发生之前 poll 调用一直阻塞
        if ((ready_number = poll(event_set, INIT_SIZE, -1)) < 0) {
            error(1, errno, "poll failed ");
        }

        // 系统监听到建立连接事件
        if (event_set[0].revents & POLLRDNORM) {
            socklen_t client_len = sizeof(client_addr);
            connected_fd = accept(listen_fd, (struct sockaddr *) &client_addr, &client_len);

            //找到一个可以记录该连接套接字的位置
            for (i = 1; i < INIT_SIZE; i++) {
                if (event_set[i].fd < 0) {
                    event_set[i].fd = connected_fd;
                    event_set[i].events = POLLRDNORM;
                    break;
                }
            }

            if (i == INIT_SIZE) {
                error(1, errno, "can not hold so many clients");
            }
            
            // 加速优化
            if (--ready_number <= 0)
                continue;
        }

        for (i = 1; i < INIT_SIZE; i++) {
            int socket_fd;
            if ((socket_fd = event_set[i].fd) < 0)
                continue;
            if (event_set[i].revents & (POLLRDNORM | POLLERR)) {
                if ((n = read(socket_fd, buf, MAXLINE)) > 0) {
                    if (write(socket_fd, buf, n) < 0) {
                        error(1, errno, "write error");
                    }
                } else if (n == 0 || errno == ECONNRESET) {
                    close(socket_fd);
                    event_set[i].fd = -1;
                } else {
                    error(1, errno, "read error");
                }

                if (--ready_number <= 0)
                    break;
            }
        }
    }
}

实质上 select 与 poll 相比较,两者只是编程接口的区别,从内核实现角度来讲,其实本质实现是差不多的,poll 解决了select 有限文件描述字的缺陷,适用的范围更广一些。

epoll

举个例子


#include "lib/common.h"

#define MAXEVENTS 128

char rot13_char(char c) {
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

int main(int argc, char **argv) {
    int listen_fd, socket_fd;
    int n, i;
    int efd;
    struct epoll_event event;
    struct epoll_event *events;

    listen_fd = tcp_nonblocking_server_listen(SERV_PORT);
    
    // 调用 epoll_create0 创建了一个 epoll 实例
    efd = epoll_create1(0);
    if (efd == -1) {
        error(1, errno, "epoll create failed");
    }

    // 调用 epoll_ctl 将监听套接字对应的 I/O 事件进行了注册,
    // 这样在有新的连接建立之后,就可以感知到。
    // 注意这里使用的是 edge-triggered(边缘触发)
    event.data.fd = listen_fd;
    event.events = EPOLLIN | EPOLLET;
    if (epoll_ctl(efd, EPOLL_CTL_ADD, listen_fd, &event) == -1) {
        error(1, errno, "epoll_ctl add listen fd failed");
    }

    /* Buffer where events are returned */
    events = calloc(MAXEVENTS, sizeof(event));

    while (1) {
        n = epoll_wait(efd, events, MAXEVENTS, -1);
        printf("epoll_wait wakeup\n");
        for (i = 0; i < n; i++) {
            if ((events[i].events & EPOLLERR) ||
                (events[i].events & EPOLLHUP) ||
                (!(events[i].events & EPOLLIN))) {
                fprintf(stderr, "epoll error\n");
                close(events[i].data.fd);
                continue;
            } else if (listen_fd == events[i].data.fd) {
                struct sockaddr_storage ss;
                socklen_t slen = sizeof(ss);
                int fd = accept(listen_fd, (struct sockaddr *) &ss, &slen);
                if (fd < 0) {
                    error(1, errno, "accept failed");
                } else {
                    make_nonblocking(fd);
                    event.data.fd = fd;
                    event.events = EPOLLIN | EPOLLET; //edge-triggered
                    if (epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event) == -1) {
                        error(1, errno, "epoll_ctl add connection fd failed");
                    }
                }
                continue;
            } else {
                // 处理了已连接套接字上的可读事件,读取字节流,编码后再回应给客户端
                socket_fd = events[i].data.fd;
                printf("get event on socket fd == %d \n", socket_fd);
                while (1) {
                    char buf[512];
                    if ((n = read(socket_fd, buf, sizeof(buf))) < 0) {
                        if (errno != EAGAIN) {
                            error(1, errno, "read error");
                            close(socket_fd);
                        }
                        break;
                    } else if (n == 0) {
                        close(socket_fd);
                        break;
                    } else {
                        for (i = 0; i < n; ++i) {
                            buf[i] = rot13_char(buf[i]);
                        }
                        if (write(socket_fd, buf, n) < 0) {
                            error(1, errno, "write error");
                        }
                    }
                }
            }
        }
    }

    free(events);
    close(listen_fd);
}

Reference