本文最后更新于 2025年8月30日 凌晨
输入与输出
在计算机系统中,由输入设备,网络输入设备,存储设备向内存中加载数据是计算机的输入过程,
而由内存向输出设备,网络设备,存储设备中写入数据是计算机的输出过程。
存储
在Linux系统中,读写过程是通过文件描述符(fd)来实现的。在内核中将标准输入/socket/文件都统一为文件结构
1 2 3 4 5 6
| struct file { struct path f_path; struct inode *f_inode; const struct file_operations *f_op; };
|
再根据具体的结构不同,绑定不同的底层对象,以及操作方法。而向用户态返回的整型数值,是由内核态进程管理的 ftable 中,该文件对象对应的索引。每次申请文件描述符时,内核都会提供最小的可用描述符索引。
1 2 3 4 5 6
| current →files →fdt →fd[文件描述符数值] →具体的file结构
|
阻塞与非阻塞
当进行输入/输出过程时。可以分为两种策略:
因为设备间的读写速率存在差异,导致读取过程,并不能同步且高效的执行。
- 输入/输出 作为必须的执行过程。需要执行完毕后,再执行后续操作。这时如果数据的写入速度较慢, 会导致CPU消耗较多的时间等待数据写入,造成“阻塞”
- 输入/输出 作为非必须的执行过程。当所需读取的数据不必等待数据的读写,而可以直接进行后续工作时,这时可以通过轮询的方式检测对应的数据是否加载完毕,而不一定要持续等待数据的加载过程,实现“非阻塞”
阻塞I/O的实现
阻塞I/O的实现 是 最基础的I/O实现。当用户尝试 使用read() 读取内核缓冲区中的数据时,如果数据没有准备好,则将线程阻塞,等待数据拷贝完成后返回,再进行数据解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| if((sockfd = socket(PF_INET, SOCK_STREAM,0)) == -1){ perror("创建 server socket 失败"); exit(EXIT_FAILURE); } printf("socket success! , sockfd = %d\\n", sockfd);
server_sockaddr.sin_family = AF_INET; server_sockaddr.sin_port = htons(SERVPORT); server_sockaddr.sin_addr.s_addr = INADDR_ANY;
if((bind(sockfd, (struct sockaddr *)&server_sockaddr,sizeof(struct sockaddr))) == -1){ perror("bind"); exit(EXIT_FAILURE); }
if(listen(sockfd, SOMAXCONN) == -1){ perror("listen"); exit(EXIT_FAILURE); }
socklen_t len = sizeof(client_sockaddr); client_fd = accept(sockfd, (struct sockaddr *)&client_sockaddr, &len); if(client_fd == -1){ perror("accept"); exit(EXIT_FAILURE); } printf("accept success!\\n");
recvbytes = read(client_fd , buf , MAXDATASIZE-1); if(recvbytes == -1){ perror("recv"); exit(EXIT_FAILURE); } else if (recvbytes == 0){ printf("client closed connection\\n"); } else { buf[recvbytes] = '\\0'; printf("received a connection : %s\\n" , buf); }
close(client_fd); close(sockfd);
|
非阻塞I/O的实现
通过设置fcntl(client_fd, F_SETFL, O_NONBLOCK); 将socket设置为非阻塞IO。当尝试读取时,获得的响应是 数据的准备状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| if((sockfd = socket(PF_INET, SOCK_STREAM,0)) == -1){ perror("创建 server socket 失败"); exit(1); }
printf("socket success! , sockfd = %d\\n", sockfd); server_sockaddr.sin_family = AF_INET; server_sockaddr.sin_port = htons(SERVPORT); server_sockaddr.sin_addr.s_addr = INADDR_ANY;
if((bind(sockfd, (struct sockaddr *)&server_sockaddr,sizeof(struct sockaddr))) == -1){ perror("bind"); exit(-1); } printf("accept success!\\n"); fcntl(client_fd, F_SETFL, O_NONBLOCK); recvbytes = read(client_fd , buf , MAXDATASIZE);
if(recvbytes == -1){ if (errno == EAGAIN){ printf("no data read !!! \\n"); } perror("recv"); exit(1); }
printf("received a connection : %s\\n" , buf); close(sockfd);
|
轮询
轮询就是在用户态持续发起系统调用,检查数据的准备状态,当最终准备就绪后,完成数据的读取
1 2 3 4 5 6 7 8 9 10 11 12 13
| for (;;) { recvbytes = read(client_fd, buf, MAXDATASIZE); if(recvbytes == -1){ if(errno == EAGAIN) continue; perror("recv"); exit(1); } if(recvbytes == 0){ break; } printf("received a connection: %s\\n",buf); }
|
通知
通知实现本质上就是将处于用户态的轮询过程委托给内核态中的结构,在内核态持续检查数据资源的读写状态,当存在状态就绪的资源描述符后,将结果返回给用户态程序,再由用户态程序遍历资源描述符,完成I/O过程。
多路复用
在通知模式下,因为同一个进程一次可以将多个I/O事件交付给底层结构,而不需要考虑用户态轮询检查某个I/O ,如果某个I/O就绪,就会阻塞其他I/O的 读取或连接过程的问题。通过使用底层结构检测就绪状态并通知用户态处理,这样当某一路IO 就绪,可以直接完成对该路IO的处理,其他时间则可以处理其他工作。而这种实现机制被称为“多路复用”
多路复用的实现
多路复用存在多种实现,其核心原理是通过特定的数据结构,将文件描述符的就绪状态交由内核态中的结构管理,当某个描述符就绪时,返回给用户态进程进行处理。主要实现有 select,poll和 epoll
select
select 是一种较早实现的多路复用机制。它定义了以下参数
1
| int select(int nfds , fd_set *readfds, fd_set * writefds, fd_set *exceptfds, struct timeval *timeout);
|
readfds : 受关注的可读事件文件描述符
writefds : 受关注的可写事件文件描述符
exceptfds : 受关注的产生异常的事件文件描述符
fd_set
fd_set 是一种用于描述文件描述符的比特数组。
1 2 3 4
| #define __FD_SETSIZE 1024 typedef struct { unsigned long fds_bits[__FD_SETSIZE/ (8 * sizeof(long))]; } __kernel_fd_set ;
|
void FD_CLR(int fd, fd_set *set) : 设置元素值为0
int FD_ISSET(int fd, fd_set *set) :检查对应位置的设置
void FD_SET(int fd,, fd_set *set) : 设置元素值为1
void FD_ZERO(fd_set *set) : 设置所有元素为0
timeval
1 2 3 4
| struct timeval { long tv_sec ; long tv_usec ; };
|
- 设置成空 : 表示如果没有I/O事件发生,则select 一直等下去
- 设置非零值 : 等待一段时间后,从select阻塞调用中返回
- 设置为0 : 表示根本不等待
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| int main(void){ fd_set rfds ; struct timeval tv ; int retval ; FD_ZERO(&rfds); FD_SET(0,&rfds); tv.tv_sec = 5; tv.tv_usec = 0; retval = select(1,&rfds , NULL,NULL, &tv); printf("come from select().\\n"); if(retval == -1) perror("select()"); else if (FD_ISSET(0,&rfds printf("Data is availiable now.\\n"); } else printf("No data with in five seconds.\\n"); exit(EXIT_SUCCESS); }
|
select 应用场景实现
单客户端
基于socket的io过程,只需要创建socket ,将对应的文件描述符设置到比特数中,持续轮询检查数组中的置位情况,当数据就绪时,对数据进行读取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| int main(int argc, char *argv[]){ int sockfd, sendbytes; char buf[MAXDATASIZE]; struct hostent* host; struct sockaddr_in serv_addr; char send_line[MAXDATASIZE], recv_line[MAXDATASIZE]; fd_set rfds ; fd_set readmask ; int retval ; if(argc < 2){ fprintf(stderr , "Please enter the server‘s hostname"); exit(1); } if((host = gethostbyname(argv[1])) == NULL) { perror("gethostbyname"); exit(1); } if((sockfd = socket(PF_INET, SOCK_STREAM,0)) == -1 ){ perror("创建 client socket失败"); exit(1); } serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(SERVPORT); serv_addr.sin_addr = *((struct in_addr *)host->h_addr); if((connect(sockfd,(struct sockaddr *)&serv_addr, sizeof(struct sockaddr))) == -1){ perror("connect error"); exit(1); } FD_ZERO(&rfds); FD_SET(0, &rfds); FD_SET(sockfd, &rfds); for(;;){ readmask = rfds ; retval = select(sockfd + 1 , &readmask, NULL,NULL,NULL); if(retval == -1) perror("select()"); if(FD_ISSET(STDIN_FILENO, &readmask)){ if(fgets(send_line, MAXDATASIZE ,stdin) != NULL){ if(!strcmp(send_line, "quit\\n")){ break; } int i = strlen(send_line); if (send_line[i - 1] == '\\n'){ send_line[i - 1] = 0; } printf("sending '%s'\\n", send_line); size_t rt = write(sockfd, send_line, strlen(send_line)); if(rt < 0 ){ perror("write failed"); } printf("sended bytes: %zu \\n", rt); } } if(FD_ISSET(sockfd,&readmask)){ int n = read(sockfd, recv_line, MAXDATASIZE); if(n < 0){ perror("read failed"); } else if (n == 0){ perror("server terminated \\n"); } recv_line[n] = 0; fputs(recv_line, stdout); fputs("\\n", stdout); } } close(sockfd); }
|
多客户端实现
对于多客户端,需要将监听socket 放入文件描述符数组中,当有新的连接请求发生时,监听socket,创建新的连接socket,并将其加入到数组中,设置读事件。再之后轮询文件描述符,如果某个描述符就绪,那么就进行读过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| int main() { struct sockaddr_in cs; int c_size , recvbytes , listen_socket_fd , client_fd , i ; char buf[MAXDATASIZE]; listen_socket_fd = tcp_listen_socket(SERVERPORT); fd_set rfds , readmask; int retval , maxfd; FD_ZERO(&rfds); FD_SET(listen_socket_fd, &rfds); maxfd = listen_socket_fd; for(;;) { readmask = rfds; retval = select(maxfd+1, &readmask, NULL, NULL, NULL); if (FD_ISSET(listen_socket_fd, &readmask)){ if((client_fd = accept(listen_socket_fd, (struct sockaddr *) &cs,&c_size) == -1) { perror("accept"); exit(1); } printf("accept success! fd=%d\\n", client_fd); FD_SET(client_fd, &rfds); if(client_fd > maxfd) maxfd = client_fd; } int fd ; for (fd = 0 ; fd < maxfd; fd++){ if(fd == listen_socket_fd) continue; if(FD_ISSET(fd, &readmask)){ if((recvbytes = read(fd,buf,MAXDATASIZE)) == -1){ perror("read error"); exit(1); } if (recvbytes == 0){ close(fd) ; FD_CLR(fd, &rfds); continue ; } for(i=0; i< recvbytes; i++){ buf[i] = process_char(buf[i]); } write(fd, buf, recvbytes); memset(buf, 0x00, sizeof(buf)); } } }
|
非阻塞的select 的读写过程
对于 非阻塞情况下, 需要考虑的每次读写并不等待到读写过程执行完毕。这样就导致需要在用户态,对读写数据,实现有状态的结构来保证读写的安全性
非阻塞IO+多路复用情况下的系统调用行为
操作 内核缓冲区状态 阻塞模式 非阻塞模式
read() 接收缓冲区有数据 立即返回 立即返回
1
| 接收缓冲区无数据 等待数据到来 立即返回(-1)errno=EAGAIN
|
write() 发送缓冲区空闲 全部数据写入发送缓冲区后返回 能写多少写多少
1
| 发送缓冲区不空闲 等待发送缓冲区空闲 立即返回(-1)errno=EAGAIN
|
accept() 接收缓冲区有数据 立即返回 立即返回
1
| 接收缓冲区无数据 等待数据到来 立即返回(-1)errno=EAGAIN
|
如果不设置状态,每次的读写 都会根据 缓冲区的大小读写对应大小的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| int main() { struct sockaddr_in cs; int c_size, recvbytes, listen_socket_fd, client_fd, i; char buf[MAXDATASIZE]; listen_socket_fd = tcp_listen_socket(SERVERPORT); fcntl(listen_socket_fd, F_SETFL, O_NONBLOCK); fd_set rfds , readmask; int retval, maxfd; FD_ZERO(&rfds); FD_SET(listen_socket_fd, &rfds); maxfd = listen_socket_fd; for(;;){ readmask = rfds; retval = select(maxfd+1, &readmask, NULL , NULL, NULL); if(FD_ISSET(listen_socket_fd, &readmask)){ if((client_fd = accept(listen_socket_fd,(struct sockaddr*)&cs ,&c_size)) == -1) { if(error == EAGAIN) continue; perror("accept"); exit(1); } printf("accept success ! fd= %d\\n",client_fd); fcntl(client_fd, F_SETFL,O_NONBLOCK); FD_SET(client_fd, &rfds); if(client_fd > maxfd) maxfd = client_fd; } int fd ; for (fd = 0 ; fd<=maxfd; fd++){ if(fd == listen_socket_fd) continue ; if(FD_ISSET(fd, &readmask)){ if((recvbytes = read(fd, buf, MAXDATASIZE)) == -1 { perror("read error"); exit(1); } if(recvbytes == 0){ close(fd); continue; } for (i = 0 ; i < recvbytest; i++){ buf[i] = process_char(buf[i]); } write(fd, buf, recvbytes); memset(buf, 0x00 , sizeof(buf)); } } } }
|
设置用户态缓冲区数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
| # define MAX_LINE 1024 # define FD_INIT_SIZE 128 struct Buffer { int connect_fd ; char buffer[MAX_LINE]; size_t writeIndex; size_t readIndex; int readable ; };
struct Buffer *alloc_Buffer(){ struct Buffer *buffer = malloc(sizeof(struct Buffer)); if(! buffer) return NULL; buffer -> connect_fd = 0 ; buffer -> writeIndex = 0; buffer -> readIndex = 0; buffer -> readable = 0; return buffer; } void free_Buffer(struct Buffer *buffer){ free(buffer); } char process_char(char c){ if(c >= 'a' && c <= 'z'){ return c - 32; } esle if (c >= 'A' && c <= 'Z'){ return c + 32; } return c ; }
int onSocketRead(int fd , struct Buffer *buffer){ char buf[1024]; int i ; ssize_t result ; while(1) { result = read(fd, buf, sizeof(buf)); if(result <= 0) break; for(i = 0 ; i < result ; ++i){ if(buffer->writeIndex < sizeof(buffer->buffer)) buffer->buffer[buffer->writeIndex++] = process_char(buf[i]); if(buf[i] == '\\n'){ buffer->readable = 1; } } } if (result == 0){ return 1 ; } else if (result < 0){ if (errno == EAGAIN) return 0; return -1 ; } return 0 }
int onSocketWrite(int fd, struct Buffer *buffer){ while(buffer->readIndex < buffer-> WriteIndex){ ssize_t result = write(fd, buffer->buffer + buffer->readIndex, buffer->writeIndex - buffer->readIndex); if(result<0){ if(errno === EAGAIN) return 0; return -1; } buffer->readIndex += result ; } if (buffer->readIndex == buffer->writeIndex) buffer->readIndex = buffer->writeIndex = 0; buffer -> readable = 0 ; return 0; }
int main(){ struct sockaddr_in cs; int a_size, i ; int listen_socket_fd; listen_socket_fd = tcp_listen_socket(SERVERPORT); fcntl(listen_socket_fd, F_SETFL, O_NONBLOCK); struct Buffer *buffer[FD_INIT_SIZE]; for(i = 0 ; i< FD_INIT_SIZE; ++i){ buffer[i] = alloc_Buffer(); } fd_set readfds , writefds, exfds; int retval ; int maxfd = listen_socket_fd ; for (;;){ FD_ZERO(&readfds); FD_ZERO(&writefds); FD_ZERO(&exfds); FD_SET(listen_socket_fd, &readfds); for(i=0; i < FD_INIT_SIZE ; ++i){ if(buffer[i]->connect_fd > 0){ if(buffer[i] -> connect_fd >maxfd) maxfd = buffer[i]->connect_fd; FD_SET(buffer[i]->connect_fd, &readfds); if(buffer[i] -> readable){ FD_SET(buffer[i]->connect_fd,&writefds); } } } retval = select(maxfd + 1 ,&readfds,&writefds,&exfds,NULL); if(retval < 0){ perror("select error"); } if(FD_ISSET(listen_socket_fd, &readfds)){ int fd = accept(listen_socket_fd,(struct sockaddr*)&cs,&a_size); if(fd == -1){ perror("accept"); exit(1); }else if(fd > FD_INIT_SIZE){ perror("too many connections"); close(fd); }else{ printf("accept success! fd = %d\\n", fd); fcntl(fd, F_SETFL, O_NONBLOCK); if(buffer[fd]->connect_fd == 0){ buffer[fd]->connect_fd = fd; } else { perror("connection already exists"); } } for(i = 0 ; i < maxfd + 1 ; ++i){ int r = 0; if (i == listen_socket_fd) continue; if(FD_ISSET(i, &readfds)){ r = onSocketRead(i, buffer[i]); } if(r==0 && FD_ISSET(i, &writefds)){ r = onSocketWrite(i,buffer[i]); } if(r) { buffer[i] -> connect_fd = 0; close(i); } } } } }
|
select的实现原理
select的实现原理是通过将用户态定义的,读/写/异常数组拷贝到内核中的fd_set_bits中,在该结构中存储了用户关心通知的 in,out,ex数组。以及res_in,res_out,res_ex状态数组。在内核态中,遍历所有的文件描述符,在初次遍历中,将进程放入对应描述符的等待队列中,当有事件发生时,比较 res_* 与 * 数组索引的状态,如果该fd的关注状态(通过FD_SET设置)与 事件就绪状态一致,就在事件计数器中加1,最终返回本次执行的就绪事件数,再有用户态轮询查询就绪的fd进行处理。

poll
poll的原理与select 类似,在poll中,它接收一个pollfd数组。
1 2 3 4 5 6 7 8 9
| #include <poll.h> int poll(struct pollfd *fds, nfds_t nfds, int timeout);
<0 表示在有事件发生之前永远等待 =0 表示不阻塞进程,立即返回 >0 表示poll调用方等待指定的毫秒数后返回
|
在每个pollfd元素中,定义了两个结构 events : 用于描述事件类型,revents :用于描述就绪的事件类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| struct pollfd{ int fd; short events; short revents; };
|
使用poll调用,其实现过程是,将 描述符添加到 events 结构中,设置 描述符和期望的事件类型。
当资源准备就绪后,poll会将对应的事件类型写入到revents中,再在用户态中比较 revents中的状态与期望状态,然后执行对应的操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| char buf[MAXDATASIZE]; struct hostent* host; struct sockaddr_in serv_addr; char send_line[MAXDATASIZE], recv_line[MAXDATASIZE] if(argc < 2){ fprintf(stderr, "Please enter the server's hostname"); exit(1); } if((host = gethostbyname(argv[1])) == NULL){ perror("gethostbyname"); exit(1); } if((sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1{ perror("创建client socekt 失败"); exit(1); } serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(SERVPORT); serv_addr.sin_addr = *((struct in_addr *)host->h_addr);
if((connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(struct sockaddr))) == -1){ perror("connect error"); exit(1); }
struct pollfd event_set[2]; event_set[0].fd = STDIN_FILENO; event_set[0].events = POLLIN; event_set[1].fd = sockfd; event_set[1].events = POLLIN;
int retval; for(;;) { retval = poll(event_set, 2, -1); if(retval < 0) perror("poll()"); if(event_set[0],revents & POLLIN){ if(fgets(send_line, MAXDATASIZE, stdin) != NULL){ if(!strcmp(send_line, "quit\\n")){ break; } int i = strlen(send_line); if(send_line[i - 1] == '\\n'){ send_line[i - 1] = 0 ; } printf("sending '%s'\\n", send_line); size_t rt =write(sockfd , send_line, strlen(send_line)); if(rt < 0){ perror("write failed"); } printf("sended bytes: %zu \\n", rt); } } if(event_set[1].revents & POLLIN){ int n = read(sockfd, recv_line, MAXDATASIZE); if(n < 0){ perror("read failed"); } else if (n == 0) { perror("server terminated \\n"); } recv_line[n] = 0 ; fputs(recv_line, stdout); fputs("\\n", stdout); } } close(sockfd);
|
在poll的服务端实现中,需要在接收新的文件描述符后,将其添加到event_set中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| int main() { struct sockaddr_in cs; int a_size , i ; int listen_socket_fd ; listen_socket_fd = tcp_listen_socket(SERVERPORT); fcntl(listen_socket_fd , F_SETFL, 0_NONBLOCK); struct Buffer *buffer[FD_INIT_SIZE]; for(i = 0 ; i < FD_INIT_SIZE; ++i){ buffer[i] = alloc_Buffer(); } int retval ; struct pollfd event_set[FD_INIT_SIZE]; event_set[0].fd = listen_socket_fd; event_set[0].events = POLLIN; for(i=1; i< FD_INIT_SIZE;i++){ event_set[i].fd = -1; } for(;;){ for(i = 1; i < FD_INIT_SIZE; i++) { if(buffer[i]->connect_fd > 0){ event_set[i].events |= POLLOUT; } else if (event_set[i].revents & POLLOUT){ event_set[i].events ^= POLLOUT; } } } retval = poll(event_set, FD_INIT_SIZE, -1); if(retval < 0){ perror("select error"); } if(event_set[0].revents & POLLIN){ int fd = accept(listen_sockeN_fd, (struct sockaddr*)&cs, &a_size); if(fd == -1){ perror("accept"); exit(1); }else if (fd > FD_INIT_SIZE){ perror("too many connections"); close(fd); } else { printf("accept success! fd =%d\\n", d); fcntl(fd , F_SETFL, O_NONBLOCK); for(i = 1 ; i < FD_INIT_SIZE ; i++){ if(event_set[i].fd < 0){ event_set[i].fd = fd; event_set[i].events = POLLIN; buffer[i] ->connect_fd =fd ; break; } } } } for(i=0 ; i < FD_INIT_SIZE; ++i){ int r = 0 ; int currfd = event_set[i].fd; if (currfd == listen_socket_fd || currfd < 0) continue; if (event_set[i].revents & POLLIN){ r = onSocketRead(currfd, buffer[i]); } if (r == 0 && event_set[i].revents & POLLOUT){ r = onSocketWrite(currfd, buffer[i]); } if (r) { close(currfd); } } }
|
poll的实现原理
poll的内核层面实现原理与select的原理类似。
在发起poll调用后,将传入poll中的event_set结构传入内核,内核中遍历fd_set中的event,如果有数据准备就绪,则将对应的就绪事件写入revent中,继续查询下一个event结构。之后通过查询event_set中的revent中的就绪状态来完成读写过程。

select 和 poll的局限
在select的实现中,每一次的检测过程都需要遍历所有的文件描述符,同时用户态的比特数组被复了,导致数组会被select更新状态。另一方面,每一次调用,都要进行全新的拷贝过程,当数据量过大时,仍然会产生较高的运行成本。另外,select中还对文件描述符的最大长度存在限制。
在poll中,poll通过event_set结构将event结构组织为event数组结构,解决了最大长度的限制,但是在事件触发时,仍然需要通过轮询查询具体哪个事件发生了变更,同时,虽然减少了拷贝信息,以及不用遍历每一个文件描述符,但是仍然存在较大的 由用户态向内核态拷贝结构的过程。
epoll
在epoll 的实现中,多路复用被分为了三个阶段
- epoll_create() 调用epoll_create() 在内核中创建对应的epoll实例。epoll 中可以动态调整epoll_event的数量
- epoll_ctl() 实现了一个控制方法,将事件与描述符的关系通过方法调用的形式将更改注册到内核结构中
- epoll_wait() 最终的通知等待,在调用epoll_wait后,查询 epoll_wait的文件描述符,从中获取到每个描述符的事件信息,进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #include <sys/epoll.h> int epoll_create(int size);
#include <sys/epoll.h> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
#inlcude <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
typedef union epoll_data{ void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t;
struct epoll_event { uint32_t events ; epoll_data_t data ; }
|
epoll的多路复用实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| int main(int argc , char *argv[]){ int sockfd , sendbytes; char buf[MAXDATASIZE]; struct hostent* host; struct sockaddr_in serv_addr; char send_line[MAXDATASIZE], recv_line[MAXDATASIZE]; int i , n ; if(argc < 2){ fprintf(stderr, "please enter the server's hostname"); exit(1); } if((host gethostbyname(argv[1])) == NULL){ perror("gethostbyname"); exit(1); } if((sockfd = socket(PF_INET,SOCK_STREAM,0)) == -1){ perror("创建client socekt 失败"); exit(1); } serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(SERVPORT); serv_addr.sin_addr = *((struct in_addr *)host->h_addr); if((connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(struct sockaddr))) == -1) { perror("connect error"); exit(1); } int efd ; struct epoll_event event; struct epoll_events *events; efd = epoll_create(10); if(efd == -1){ perror("epoll create failed"); } event.data.fd = sockfd; event.events = EPOLLIN; if(epoll_ctl(efd, EPOLL_CTL_ADD, sockfd, &event) == -1){ perror("epoll_ctl socket failed"); exit(1); } event.data.fd = STDIN_FILENO; event.events = EPOLLIN; if(epoll_ctl(efd, EPOLL_CTL_ADD, STDIN_FILENO, &event) == -1){ perror("epoll_ctl and listen fd failed"); } events = calloc(2, sizeof(event)); for (;;){ n = epoll_wait(efd, events,2,-1); printf("epoll_wait_wakeup\\n"); for(i = 0 ; i < n; i++ ){ if(STDIN_FILENO == events[i].data.fd){ if(fgets(send_line,MAXDATASIZE, stdin) != NULL){ if(!strcmp(send_line, "quit\\n")){ goto quit; } int i = strlen(send_line); if(send_line[i - 1] == '\\n'){ send_line[i - 1] = 0; } printf("sending '%s'\\n", send_line); size_t rt = write(sockfd, send_line, strlen(send_line)); if(rt < 0){ perror("write failed"); } printf("sended bytes: %zu \\n", rt); } } if(sockfd == events[i].data.fd){ int n = read(sockfd, recv_line, MAXDATASIZE); if (n < 0){ perror("read failed"); }else if (n == 0){ perror("server terminated \\n"); } recv_line[n] = 0; fputs(recv_line, stdout); fputs("\\n", stdout); } } } }
|
而在服务端,则需要 注册 监听socket, 创建efd,当监听socket 获得事件通知时,将新的socket注册到efd中,这样通过epoll同时管理监听socket和连接socket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| int main() { struct sockaddr_in cs; int a_size, i ; int listen_socket_fd ; listen_socket_fd = tcp_listen_socket(SEVERPORT); fcntl(listen_socket_fd, F_sETFL, O_NONBLOCK); struct Buffer *buffer[FD_INIT_SIZE]; for(i = 0 ; i < FD_INIT_SIZE ; ++i){ buffer[i] = alloc_Buffer(); } int retval ; int efd; struct epoll_event event; efd = epoll_create(10); if(efd == -1){ perror("epoll create failed"); } event.data.fd = listen_socket_fd; event.events = EPOLLIN; if (epoll_ctl(efd, EPOLL_CTL_ADD, listen_socket_fd, &event) == -1){ perror("epoll_ctl and listen fd failed"); } events = calloc(FD_INIT_SIZE , sizeof(evnet)); for (;;){ retval = epoll_wait(efd, events, FD_INIT_SIZE, -1); if (retval < 0){ perror("select error"); } for (i = 0 ; i < retval ; i++){ if(events[i].data.fd == listen_socket_fd){ int fd = accept(listen_socket_fd , (struct sockaddr*)&cs, &a_size); if(fd == -1){ perror("accept"); exit(1); }else if (fd > FD_INIT_SIZE){ perror("too many connections"); close(fd); } else { printf("accept success! fd= %d\\n", fd); fcntl(fd, F_SETFL, O_NONBLOCK); event.data.fd = fd ; event.events = EPOLLIN ; if(epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event) == -1){ perror("epoll_ctl_add_listen_fd_failed"); } } } else { int r = 0 ; int currfd = events[i].data.fd ; if (events[i].events & EPOLLIN){ print("get EPOLLIN event on socket fd== %d\\n", currfd); r = onSocketRead(currfd,buffer[currfd]); if(buffer[currfd]->readable){ event.data.fd = currfd; event.events |= EPOLLOUT; if(epoll_ctl(efd,EPOLL_CTL_MODE, currfd, &event) == -1){ perror("epoll_ctl mod curr fd failed"); } } } if (r == 0 && events[i].events & EPOLLOUT){ r = onSocketWrite(currfd, buffer[currfd]); if (!buffer[currfd]->readable){ event.data.fd = currfd; event.events = EPOLLIN; if(epoll_ctl(efd,EPOLL_CTL_MOD, currfd, &event) == -1){ perror("epoll_ctl mod curr fd failed"); } } } if (r) { if(epoll_ctl(efd, EPOLL_CTL_DEL, currfd, &event) == -1){ perror("epoll_ctl mov curr fd failed"); } close(currfd); } } } } }
|
条件触发与边缘触发
epoll 还提供了 不同的通知方式来提高整体的处理效率
条件触发是epoll的默认工作模式,其行为类似于select和poll。只要文件描述符上存在可处理的数据,每次调用epoll_wait都会返回该文件描述符的就绪状态。
边缘触发模式只在文件描述符状态发生变化时才通知。一旦epoll_wait返回某个文件描述符就绪,如果不处理完所有数据,该文件描述符不会再次被通知。
epoll的实现原理
与 select 和 poll的实现不同,epoll直接将 查询结构维护在了内核中,通过创建-注册-查询的方式, 直接向内核中注册与文件描述符关联的事件,当对应的事件发生时,再通过查询事件中的描述符,向用户态响应事件状态,这样最大程度的减少了用户态和内核态间的数据拷贝过程。提高了整体的处理效率。
在实现上,epoll 在内核中实现了一个红黑树结构,将每一个event 结构存储在红黑树中的节点中,提高了整体通知的查询效率,当检查到某个描述符中的事件就绪后,将对应的节点加入到rdllist中,当使用epoll_wait查询时,可以通过查询rdllist中的事件信息,直接从队列中获取事件,提高了事件的查询处理效率。

同步I/O与异步I/O
同步I/O与异步I/O的根本区别在于数据拷贝阶段用户线程是否需要等待。在同步I/O模型中,当用户线程发起I/O请求后,无论是在等待数据准备还是数据拷贝阶段,用户线程都必须参与并等待操作完成。即使是非阻塞I/O,虽然可以通过轮询避免在数据准备阶段阻塞,但在最终进行read/write系统调用时,用户线程仍然需要同步等待数据从内核空间拷贝到用户空间。
前文讨论的select、poll、epoll等多路复用机制,虽然能够高效地监控多个文件描述符的状态变化,避免了传统阻塞I/O中”一个连接一个线程”的资源浪费问题,但它们本质上仍然属于同步I/O模型。这些机制只是在”等待数据准备”阶段提供了更好的并发处理能力,当epoll_wait返回就绪的文件描述符后,用户线程调用read或write进行实际数据传输时,依然需要同步等待数据拷贝过程的完成。
真正的异步I/O模型则完全不同。用户线程向内核提交I/O请求后立即返回,继续执行其他任务,整个数据准备和数据拷贝过程都由内核独立完成。当I/O操作完全结束后,内核通过回调函数、信号或事件通知等机制告知用户线程结果。在这个过程中,用户线程从不需要主动等待或参与数据传输,实现了真正意义上的异步处理。
因此,虽然多路复用技术大大提高了服务器处理并发连接的能力,使单个线程能够管理数千甚至数万个连接,但由于用户线程在数据拷贝阶段仍需等待,所以它们仍然被归类为同步I/O。这种分类基于I/O操作的完整生命周期来判断,而不仅仅是数据准备阶段的行为特征。
三种多路复用模型对比
| 模型 |
select |
poll |
epoll |
| 特征 |
使用fd_set位图结构,通过bit位标记文件描述符状态 |
|
|
| 每次调用需要重新设置fd_set,系统调用开销大 |
|
|
|
| 内核通过线性扫描检查所有fd状态,返回后用户态再次线性扫描找到就绪fd |
|
|
|
| 硬编码限制最大1024个文件描述符(FD_SETSIZE) |
使用pollfd结构体数组,每个元素包含fd、events和revents字段 |
|
|
| 解决了select的fd数量限制,支持任意数量的文件描述符 |
|
|
|
| 避免了每次重新设置描述符集合的开销 |
|
|
|
| 但仍需要内核遍历所有pollfd,用户态遍历检查revents |
内核维护红黑树存储监听的fd,使用就绪链表维护活跃事件 |
|
|
| 分离控制操作(epoll_ctl)和等待操作(epoll_wait) |
|
|
|
| 基于回调机制,fd就绪时直接加入就绪队列 |
|
|
|
| 支持条件触发和边缘触发两种通知模式 |
|
|
|
| 性能 |
连接数小于100的小型应用 |
|
|
| 需要跨平台支持的场景 |
|
|
|
| 对性能要求不高,追求代码简洁的情况 |
中等规模应用(100-1000个连接) |
|
|
| 需要突破select的fd限制但不需要最高性能 |
|
|
|
| 希望接口比select更清晰的场景 |
高并发服务器(1000+连接) |
|
|
| 对性能有严格要求的Linux服务 |
|
|
|
| 需要精细控制事件通知行为的应用 |
|
|
|
| 场景 |
连接数小于100的小型应用 |
|
|
| 需要跨平台支持的场景 |
|
|
|
| 对性能要求不高,追求代码简洁的情况 |
使用pollfd结构体数组,每个元素包含fd、events和revents字段 |
|
|
| 解决了select的fd数量限制,支持任意数量的文件描述符 |
|
|
|
| 避免了每次重新设置描述符集合的开销 |
|
|
|
| 但仍需要内核遍历所有pollfd,用户态遍历检查revents |
内核维护红黑树存储监听的fd,使用就绪链表维护活跃事件 |
|
|
| 分离控制操作(epoll_ctl)和等待操作(epoll_wait) |
|
|
|
| 基于回调机制,fd就绪时直接加入就绪队列 |
|
|
|
| 支持水平触发(LT)和边缘触发(ET)两种通知模式 |
|
|
|