/ | select | poll | epoll |
---|---|---|---|
操作方式 | 遍历 | 遍历 | 回调 |
底层实现 | 数组 | 链表 | 哈希表 |
IO效率 | 每次调用都进行线性遍历,时间复杂度为O(n) | 每次调用都进行线性遍历,时间复杂度为O(n) | 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到rdllist里面。时间复杂度O(1) |
最大连接数 | 1024(x86)或 2048(x64) | 无上限 | 无上限 |
fd拷贝 | 每次调用select,都需要把fd集合从用户态拷贝到内核态 | 每次调用poll,都需要把fd集合从用户态拷贝到内核态 | 调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝 |
select(pselect)
select 直接操纵多个文件描述符的集合 fd_set
流程:
- 创建文件描述符的集合 fd_set
- 将监听的socket和客户端socket加入 fd_set
- select()
- 用 FD_ISSET 判断哪个 fd 有事件
- 监听的socket有事件,表示有新客户端连接请求
- 客户端socket有事件,有数据或连接断开
select 只有“水平触发”模式,如果报告了fd后事件没有被处理或数据没有被全部读取,那么下次select时会再次报告该fd
select函数的缺点
- bitmap默认大小为1024,虽然可以调整但还是有限度的
- 需要遍历所有描述符
- rset每次循环都必须重新置位为0,不可重复使用
- 将rset从用户态拷贝到内核态,由内核态直接判断文件描述符是否有数据的操作,这样比直接用户态判断要快。
尽管将rset从用户态拷贝到内核态并由内核态判断是否有数据,但还是有拷贝的开销
#include <sys/types.h>
#include <sys/time.h>
// 初始化空集合
void FD_ZERO(fd_set *fdset);
// 从集合中清除fd
void FD_CLR(int fd, fd_set *fdset);
// 添加fd到集合
void FD_SET(int fd, fd_set *fdset);
// 判断是否在set中,在 非零值,不在 零
int FD_ISSET(int fd, fd_set *fdset);
// 超时值
struct timeval {
time_t tv_sec; // seconds
long tv_usec; // microseconds
}
/*
用于测试fd_set中是否由fd处于可读或可写或错误状态
当__readfds中有可读fd,__writefds中有科协fd,__exceptfds中有错误fd
成功返回状态发生变化的fd总数,失败返回-1并设置errno
*/
extern int select (int __nfds, // 需要测试的fd数目
fd_set * __readfds,
fd_set * __writefds,
fd_set * __exceptfds,
struct timeval * __timeout); // Linux在退出时会将此选项清空,故每次进入select前重新设置此选项
select 示例代码
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
int main(int argc, char const *argv[])
{
int server_sockfd, client_sockfd;
int server_len, client_len;
struct sockaddr_in server_address;
struct sockaddr_in client_address;
int result;
fd_set readfds, testfds;
server_sockfd = socket(AF_INET, SOCK_STREAM, 0);
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = htonl(INADDR_ANY);
server_address.sin_port = htons(9734);
server_len = sizeof(server_address);
bind(server_sockfd, (struct sockaddr *)&server_address, server_len);
listen(server_sockfd, 5);
FD_ZERO(&readfds);
FD_SET(server_sockfd, &readfds);
while (1) {
char ch;
int fd;
int nread;
testfds = readfds;
printf("server waiting/n");
result = select(FD_SETSIZE, &testfds, NULL, NULL, 0);
if (result < 1) {
perror("select failed");
exit(1);
}
for (fd = 0; fd < FD_SETSIZE; fd++) {
if (FD_ISSET(fd, &testfds)) {
if (fd == server_sockfd) {
client_len = sizeof(client_address);
client_sockfd = accept(server_sockfd,
(struct sockaddr*) &client_address,
&client_len);
FD_SET(client_sockfd, &readfds);
printf("adding client on fd %d/n", client_sockfd);
} else {
ioctl(fd, FIONREAD, &nread);
if (nread == 0) {
close(fd);
FD_CLR(fd, &readfds);
printf("removing client on fd %d/n", fd);
} else {
read(fd, &ch, 1);
printf("serving client on fd %d/n", fd);
write(fd, &ch, 1);
}
}
}
}
}
return 0;
}
poll(ppoll)
与select没有本质差别,管理多个文件描述符也是使用轮询,根据描述符的状态进行处理,但是poll没有最大文件描述符数量限制。
select采用了bitmap,poll采用了数组。
poll与select相同的缺点:文件描述符的数组或位图被整体复制于用户态和内核态之间,不论这些文件描述符是否有事件,它的开销随着文件描述符的增加而线性增加。二者在返回后都需要遍历整个描述符的数组。
内核将用户的fds结构体数组拷贝到内核中,当有事件发生时内核再将所有时间都返回到fds数组中,
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
int poll(struct poll_fd *fds, // 数组指针
nfds_t nfds, // 数组大小
int timeout); // 超时时间
events 和 revents 的取值:
poll 使用并不方便,代码比select和epoll都复杂,但性能不如epoll
poll 代码
#define _GNU_SOURCE
#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#define NFDS 100 // fds数组的大小
// 创建一个用于监听的socket
int CreateSocket() {
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
assert(-1 != listenfd);
struct sockaddr_in ser;
memset(&ser, 0, sizeof(ser));
ser.sin_family = AF_INET;
ser.sin_port = htons(6000);
ser.sin_addr.s_addr = inet_addr("127.0.0.1");
int res = bind(listenfd, (struct sockaddr *)&ser, sizeof(ser));
assert(-1 != res);
listen(listenfd, 5);
return listenfd;
}
// 初始化fds结构体数组
void InitFds(struct pollfd *fds) {
int i = 0;
for (; i < NFDS; ++i) {
fds[i].fd = -1;
fds[i].events = 0;
fds[i].revents = 0;
}
}
// 向fds结构体数组中插入一个文件描述符
void InsertFd(
struct pollfd *fds, int fd,
int flag) //此处flag是为了判断是文件描述符c,还是listenfd,来设置events
{
int i = 0;
for (; i < NFDS; ++i) {
if (fds[i].fd == -1) {
fds[i].fd = fd;
fds[i].events |= POLLIN;
if (flag) {
fds[i].events |= POLLRDHUP;
}
break;
}
}
}
// 从fds结构体数组中删除一个文件描述符
void DeleteFd(struct pollfd *fds, int fd) {
int i = 0;
for (; i < NFDS; ++i) {
if (fds[i].fd == fd) {
fds[i].fd = -1;
fds[i].events = 0;
break;
}
}
}
// 获取一个已完成三次握手的连接
void GetClientLink(int fd, struct pollfd *fds) {
struct sockaddr_in cli;
socklen_t len = sizeof(cli);
int c = accept(fd, (struct sockaddr *)&cli, &len);
assert(c != -1);
printf("one client link success/n");
InsertFd(fds, c, 1);
}
// 断开一个用户连接
void UnlinkClient(int fd, struct pollfd *fds) {
close(fd);
DeleteFd(fds, fd);
printf("one client unlink/n");
}
// 处理客户端发送来的数据
void DealClientData(int fd, struct pollfd *fds) {
char buff[128] = {0};
int n = recv(fd, buff, 127, 0);
if (n <= 0) {
UnlinkClient(fd, fds);
return;
}
printf("%s/n", buff);
send(fd, "ok", 2, 0);
}
// poll返回后,处理就绪的文件描述符
void DealFinishFd(struct pollfd *fds, int listenfd) {
int i = 0;
for (; i < NFDS; ++i) {
if (fds[i].fd == -1) {
continue;
}
int fd = fds[i].fd;
if (fd == listenfd && fds[i].revents & POLLIN) {
GetClientLink(fd, fds);
//获取连接
} else if (fds[i].revents & POLLRDHUP) {
UnlinkClient(fd, fds);
//断开连接
} else if (fds[i].revents & POLLIN) {
DealClientData(fd, fds);
//处理客户端数据
}
}
}
int main() {
int listenfd = CreateSocket();
struct pollfd *fds = (struct pollfd *)malloc(sizeof(struct pollfd) * NFDS);
// malloc一个fds结构体数组
assert(NULL != fds);
InitFds(fds);
//初始化fds结构体数组
InsertFd(fds, listenfd, 0);
//插入文件描述符listenfd
while (1) {
int n = poll(fds, NFDS, -1);
if (n <= 0) {
printf("poll error/n");
continue;
}
DealFinishFd(fds, listenfd);
//处理就绪的文件描述符
}
free(fds);
}
优点:
- poll() 不要求开发者计算最大文件描述符加一的大小。
- poll() 在应付大数目的文件描述符的时候速度更快,相比于select。
- 它没有最大连接数的限制,原因是它是基于链表来存储的。
- 在调用函数时,只需要对参数进行一次设置就好了
缺点:
- 大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义(epoll可以解决此问题)
- 与select一样,poll返回后,需要轮询pollfd来获取就绪的描述符,这样会使性能下降
- 同时连接的大量客户端在一时刻可能只有很少的就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降
epoll(epoll_pwait)
解决了fd_set拷贝和轮询的问题。内核每次返回的都是已就绪的文件描述符。
- 创建epoll句柄,它本身就是一个fd,需要关闭
int epoll_create(int size); // 返回一个文件描述符,参数在新版本中被忽略,但是要给一个大于0的数
- 注册需要监视的fd和事件
int epoll_ctl(int epfd, int op, // 选项为3个宏:添加 EPOLL_CTL_ADD,删除 EPOLL_CTL_DEL,修改 EPOLL_CTL_MOD int fd, // 需要监听的fd(文件描述符) struct epoll_event *event // 需要监听什么事 ); struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; //events可以是以下几个宏的集合: EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭); EPOLLOUT:表示对应的文件描述符可以写; EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); EPOLLERR:表示对应的文件描述符发生错误; EPOLLHUP:表示对应的文件描述符被挂断; EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。 EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里 typedef union epoll_data { void *ptr; int fd; _uint32_t u32; _uint64_t u64; }epoll_data_t;
- 等待事件发生
// 返回就绪事件的个数,失败-1,超时0 int epoll_wait(int epfd, struct epoll_event * events, // 用于接收内核返回的就绪事件的数组 int maxevents, // 一次最多能处理的事件个数 int timeout // 超时时间,为0则立即返回,-1永不超时 );
epoll_LT 示例模式
#include <arpa/inet.h>
#include <netinet/in.h>
#include <signal.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <iostream>
const int max_events = 128;
int server_socket;
int epoll_fd;
void sig_handler(int signo) {
close(server_socket);
close(epoll_fd);
std::cout << "recv SIGTERM, exit process." << std::endl;
exit(EXIT_SUCCESS);
}
int main(int argc, char const *argv[]) {
struct sigaction term_action;
sigset_t all_sig;
sigfillset(&all_sig);
term_action.sa_mask = all_sig;
term_action.sa_handler = sig_handler;
sigaction(SIGTERM, &term_action, nullptr);
server_socket = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8080);
server_addr.sin_addr.s_addr = inet_addr("0.0.0.0");
bind(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr));
listen(server_socket, 128);
epoll_fd = epoll_create(1);
epoll_event server_socket_event;
server_socket_event.events = EPOLLIN;
server_socket_event.data.fd = server_socket;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_socket, &server_socket_event);
epoll_event events[max_events];
while (true) {
int n = epoll_wait(epoll_fd, events, max_events, -1);
for (int i = 0; i < n; ++i) {
if (events[i].data.fd == server_socket) {
int client_socket = accept(server_socket, nullptr, nullptr);
std::cout << "accept new client: " << client_socket << std::endl;
epoll_event client_socket_event;
client_socket_event.events = EPOLLIN;
client_socket_event.data.fd = client_socket;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_socket, &client_socket_event);
} else if (events[i].events & EPOLLRDHUP) {
// 理论上 EPOLLRDHUP 信号是对方挂断后发出,但实际上可能没有这个信号
std::cout << events[i].data.fd << " disconnect" << std::endl;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, nullptr);
} else if (events[i].events & EPOLLIN) {
char buf[BUFSIZ];
memset(buf, 0, sizeof(buf));
int client_socket = events[i].data.fd;
int nrecv = recv(client_socket, buf, BUFSIZ, 0);
std::cout << "recv from " << client_socket << " :" << buf << std::endl;
if (nrecv == 0) {
std::cout << events[i].data.fd << " disconnect" << std::endl;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, nullptr);
}
}
}
}
close(epoll_fd);
close(server_socket);
return 0;
}
epoll 工作模式:
- 水平触发 level trigger:(默认,支持 block socket 和 non-block socket)
若报告了fd事件后没有被处理或数据没有被全部读取,epoll还会再报告该事件 - 边缘触发 edge trigger:(仅支持非阻塞)
若报告了fd事件后没有被处理或数据没有被全部读取,epoll不会再报告该事件。如果不立即处理,数据会丢失。
ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/288384.html