多路转接
# 1.IO模型
IO数据读取操作:同步IO(阻塞、非阻塞、信号驱动、多路转接)和异步IO。
IO进行数据读取时会分成等待和数据拷贝两步,如果程序阻塞等待然后进行拷贝会浪费大量时间。此时若把等待的工作让其他函数进行,读取操作非阻塞进行其他的代码运行可以提高程序的效率。
# 2.阻塞和非阻塞的文件读取
#include <unistd.h> #include <fcntl.h> int fcntl(int fd, int cmd, ... /* arg */ );
1
2
3fd: 文件描述符
cmd:F_GETFL -> 获取文件参数,设置时忽略arg。F_SETFL ->通过arg 设置文件状态:O_APPEND, O_ASYNC, O_DIRECT, O_NOATIME, O_NONBLOCK
F_GETFL 返回值:成功返回文件状态标志;失败返回-1,并且设置errno
F_SETFL 返回0
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
void SetBlockStatus(int fd) //设置文件为非阻塞
{
int fc = fcntl(fd, F_GETFL);
if (fc < 0)
{
return;
}
fcntl(fd, F_SETFL, fc | O_NONBLOCK);
}
int main()
{
SetBlockStatus(0);
while (1)
{
char buffer[1024];
ssize_t s = read(0, buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = 0;
write(1, buffer, strlen(buffer));
}else{
printf("failed error : %d\n", errno); // EAGAIN || EWOULDBLOCK
}
sleep(1);
}
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 3.多路转接之select
- 一次可以等待多个fd,提高io效率,相比于多线程来说减少了调度消耗的资源
- 缺点:
- 需要用户端保存文件描述符,并且一次select后要使用循环检测并重新设值
- fd_set对应的文件描述符有上限
- 系统底层需要轮询检测fd_set上的就绪态
- 存在用户和内核间的频繁拷贝
#include <sys/select.h> /* According to earlier standards */ #include <sys/time.h> #include <sys/types.h> #include <unistd.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); // 设置fd_set的函数 void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
nfds:代表最大文件描述符数值+1
fd_set:位图,一个比特位代表一个sock,最大支持1024个文件
readfds: 只关心读 ; writefds: 只关心写 ; exceptfds: 只关心异常。这些是输入输出参数,一方面用户告知内核需要等待的文件fd,另一方面当文件就绪时内核告知用户就绪的文件fd。由于输出时会将fd_set数据置位,因此需要重新传入,需要应用层保存数据。
timeout:就绪立即返回;不就绪就等待timeout时间,时间到立即返回。输入的用户设定的时间,输出的是等待后剩余的时间。若值为NULL,代表阻塞等;设为0,非阻塞,轮询检测;
struct timeval { long tv_sec; /* seconds */ long tv_usec; /* microseconds */ };
1
2
3
4
- 返回值:失败返回-1,并且设置errno。成功返回三个位图中就绪文件描述符的个数,如果timeout释放有可能导致结果为0。
# 代码示例
- Socket.hpp
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <cstring>
#include <netinet/in.h>
#include <arpa/inet.h>
class Sock{
public:
static int InitSocket() {
int _sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(_sockfd == -1) {
std::cerr << "Create Socket Failed" << std::endl;
exit(-1);
}
//设置服务器处于TIME_WAIT状态也能建立连接
int opt = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
return _sockfd;
}
static void Bind(int sock, uint16_t port) {
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if(bind(sock, (sockaddr*)&local, sizeof(local)) == -1) {
std::cerr << "Bind Failed" << std::endl;
exit(-1);
}
}
static void Listen(int sock) {
if(listen(sock, 5) == -1) {
std::cerr << "Bind Failed" << std::endl;
exit(-1);
}
}
static int Accept(int sock) {
struct sockaddr_in remote;
socklen_t len = sizeof(remote);
int fd = accept(sock, (sockaddr*)&remote, &len);
return fd;
}
static void Connect(int sock, std::string ip, uint16_t port) {
struct sockaddr_in remote;
memset(&remote, 0, sizeof(remote));
remote.sin_family = AF_INET;
remote.sin_port = htons(port);
remote.sin_addr.s_addr = inet_addr(ip.c_str());
if(connect(sock, (struct sockaddr*)&remote, sizeof(remote)) == 0) {
std::cout << "connect success" << std::endl;
}else{
std::cout << "connect failed" << std::endl;
exit(-1);
}
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
- Select.cpp
#include "Socket.hpp"
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>
#include <vector>
#define m_port 8080
#define fSetNUM 1024
int main() {
std::vector<int> fd_array(fSetNUM, -1); // 记录需要等待的套接字
int listen_fd = Sock::InitSocket();
Sock::Bind(listen_fd, m_port);
Sock::Listen(listen_fd);
// int in_sock = Sock::Accept(listen_fd); // 不在此阻塞等待,accept只用来拷贝数据
fd_set readBitset;
while(true) {
fd_array[0] = listen_fd;
int max_fd = listen_fd;
FD_ZERO(&readBitset);
for(int i = 0; i < fSetNUM; i++) {
if(fd_array[i] == -1) continue; //存在监听失败的套接字
FD_SET(fd_array[i], &readBitset);
max_fd = std::max(max_fd, fd_array[i]); // 得到最大的fd
}
struct timeval timeout = {3,0};
int OKNum = select(max_fd + 1, &readBitset, nullptr, nullptr, &timeout);
if(OKNum == -1) {
std::cout << "select wait failed" << std::endl;
}else if(OKNum == 0) {
std::cout << "select timeout" << std::endl;
}else{
// 套接字有事件就绪,不处理会一直通知。
for(int i = 0; i < fSetNUM; i++) {
if(fd_array[i] == -1) continue; //存在监听失败的套接字
if(FD_ISSET(fd_array[i], &readBitset)) {
//读事件就绪,需要得到就绪事件的fd, 通过readBitset返回用户
if(fd_array[i] == listen_fd) {
//新连接来了,添加新的套接字
int sock = Sock::Accept(listen_fd);
int j = 1;
for(; j < fSetNUM ; j++) {
if(fd_array[j] == -1) break;
}
if(j < fSetNUM) fd_array[j] = sock;
else {
close(sock); // 维护的文件描述符超出了select的承载范围
}
}else{
//普通连接的套接字就绪
//read,recv——不会阻塞等待了。但需要处理粘包问题
char Buff[1024] = {0};
ssize_t s = recv(fd_array[i], Buff, sizeof(Buff) - 1, 0);
if(s > 0) {
Buff[s] = 0;
std::cout << "client[" << fd_array[i] << "] :" << Buff << std::endl;
}else if(s == 0){ // 对端连接关闭了
close(fd_array[i]);
fd_array[i] = -1;
}else{
close(fd_array[i]);
fd_array[i] = -1;
}
}
}
}
}
}
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# 4.多路转接之poll
#include <poll.h> int poll(struct pollfd *fds, nfds_t nfds, int timeout)
1
2
- struct pollfd:
struct pollfd { int fd; /* file descriptor */ short events; /* 用户输入的 */ short revents; /* 系统返回的 */ };
1
2
3
4
5
events POLLIN POLLPRI POLLOUT POLLRDHUP - - - 有读数据 有优先数据 写非阻塞(可写) 对方流套接字关闭 - - - revents POLLIN POLLPRI POLLOUT POLLRDHUP POLLERR POLLHUP POLLNVAL 如TCP带外数据 错误连接 挂起 无效请求
- nfds:文件描述符数组的大小。若对应的fd为负数,相应的event被忽略且revents返回0
- timeout:阻塞等待时间,单位毫秒。设置为0——非阻塞,即便没有文件描述符就绪也会立即返回;设为负数,永久阻塞。
- 返回值:成功返回正数;失败返回-1;若timeout到达并且没有文件描述符就绪,返回0.
#include <poll.h>
#include <unistd.h>
#include <iostream>
int main() {
struct pollfd fdSet;
fdSet.fd = 0; //标准输入
fdSet.events = POLLIN;
fdSet.revents = 0;
while(true) {
int ret = poll(&fdSet, 1, -1);
if(ret == 0) {
std::cout << "time out" << std::endl;
}else if(ret == -1) {
std::cout << "error" << std::endl;
}else{
if(fdSet.revents & POLLIN) { //读事件就绪
char buff[1024] = {0};
ssize_t s = read(fdSet.fd, buff, sizeof(buff) -1);
if(s > 0) {
std::cout << buff << std::endl;
}
}
}
}
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 5.多路转接之epoll
# 函数接口
#include <sys/epoll.h> int epoll_create(int size);
1
2
size :代表内核会为多少文件描述符开空间,但是在新内核中无效,必须大于0以兼容旧系统。
返回值:成功返回非负的文件描述符,需要close;失败返回-1
#include <sys/epoll.h> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
1
2
- epfd:
epoll_create
创建的文件描述符,指向一个 epoll instance- op: 用户设置系统对文件描述符fd的操作类型:
EPOLL_CTL_ADD
: 增加fd到epfd;EPOLL_CTL_MOD
: 修改fd的event类型;EPOLL_CTL_DEL
:从epfd删除fd。- fd:需要等待的文件描述符
- event:
typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };
1
2
3
4
5
6
7
8
9
10
11其中uint32_t events可选:
EPOLLIN:关联文件可用于读取操作。 EPOLLOUT:相关文件可用于写入(2)操作。只要用户利用epoll_ctl设置,就会触发一次写就绪 EPOLLRDHUP (since Linux 2.6.17):流套接字对等端关闭连接,或关闭写入连接的一半。(此标志对于编写简单代码以检测对等机关闭特别有用,当使用边缘触发监控时。) EPOLLPRI:存在可用于读取(2)操作的紧急数据。 EPOLLERR:关联的文件描述符出现错误情况。epoll_ wait(2)将始终等待此事件;没有必要在事件中设置它。 EPOLLHUP:关联的文件描述符发生挂起。epoll_ wait(2)将始终等待此事件;没有必要在事件中设置它。 EPOLLET:设置关联文件描述符的边缘触发行为。epoll的默认行为是水平触发的。 EPOLLONESHOT (since Linux 2.6.2):设置关联文件描述符的一次性行为。这意味着在使用epoll_wait(2)拉出事件后,关联的文件描述符在内部被禁用,epoll接口不会报告其他事件。用户必须使用EPOLL_CTL_MOD 。
1
2
3
4
5
6
7
8
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
1
2
- events:输出型参数,系统告诉用户哪些文件描述符对应的时间就绪了。
- maxevents:输出型参数
- 返回值:返回就绪的文件描述符的个数,如果在timeout时间内没有任何文件就绪就返回0;错误返回-1.
#include <sys/epoll.h>
#include "Socket.hpp"
#include <unistd.h>
#define port 8080
#define NUM 32
int main() {
int epfd = epoll_create(64);
int listen_fd = Sock::InitSocket();
Sock::Bind(listen_fd, port);
Sock::Listen(listen_fd);
// 将监听套接字添加到内核
struct epoll_event evt;
evt.events = EPOLLIN;
evt.data.fd = listen_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &evt);
struct epoll_event revt[NUM]; // 保存就绪的事件
// 事件循环
while(true) {
int OKNum = epoll_wait(epfd, revt, NUM, 1000); //获取就绪列表
if(OKNum == 0) {
std::cout << "Time Out" << std::endl;
}else if(OKNum == -1) {
std::cout << "epoll error" << std::endl;
}else{
for(int i = 0; i < OKNum; i++) {
int OKfd = revt[i].data.fd;
if(revt[i].events & EPOLLIN) { // 读事件就绪
if(OKfd == listen_fd) {
int newfd = Sock::Accept(listen_fd);
if(newfd == -1) {
std::cout << "Accept New FD failed" << std::endl;
continue;
}
struct epoll_event newEvt;
newEvt.events = EPOLLIN;
newEvt.data.fd = newfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, newfd, &newEvt);
}else { //其他文件描述符
char buff[1024];
ssize_t s = recv(OKfd, buff, sizeof(buff) - 1, 0);
if(s == 0) {
std::cout << "remote client quit" << std::endl;
close(OKfd);
epoll_ctl(epfd, EPOLL_CTL_DEL, OKfd, nullptr); //删除epoll fd
}else if(s > 0) {
buff[s] = 0;
std::cout << "client[" << OKfd << "]: " << buff << std::endl;
}else {
std::cout << "" << std::endl;
close(OKfd);
epoll_ctl(epfd, EPOLL_CTL_DEL, OKfd, nullptr);
}
}
}else if(revt[i].events & EPOLLOUT) { //写事件就绪
// ,,,
}
}
}
}
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# epoll原理
epoll_create创建了一个红黑树(维护一些文件描述符)、等待就绪队列、回调函数。
epoll_ctl用来添加红黑树的一个节点,并对相应的描述符设置关心的事件,系统根据设置的事件添加相应的回调机制。
epoll_wait获取就绪队列中就绪的事件。
硬件将数据拷贝到内核会触发中断,此时执行回调函数,将拷贝到内核的数据添加到就绪队列中,等待上层应用调用。
# epoll的工作方式
- ET(边缘触发)模式——epoll设置EPOLLET
ET通知一次,为了保证数据能被读完,则会要求程序员循环读取。若阻塞读取,前一次读取刚好读取完,则最后一次读取会被卡住,所有ET模式下读取要设置成非阻塞读取。
- LT(水平触发)模式:只要有数据会一直通知。——select、poll、epoll的默认模式