多路转接
# 1.IO模型
IO数据读取操作:同步IO(阻塞、非阻塞、信号驱动、多路转接)和异步IO。
IO进行数据读取时会分成等待和数据拷贝两步,如果程序阻塞等待然后进行拷贝会浪费大量时间。此时若把等待的工作让其他函数进行,读取操作非阻塞进行其他的代码运行可以提高程序的效率。
# 2.阻塞和非阻塞的文件读取
#include <unistd.h> #include <fcntl.h> int fcntl(int fd, int cmd, ... /* arg */ );1
2
3fd: 文件描述符
cmd:F_GETFL -> 获取文件参数,设置时忽略arg。F_SETFL ->通过arg 设置文件状态:O_APPEND, O_ASYNC, O_DIRECT, O_NOATIME, O_NONBLOCK
F_GETFL 返回值:成功返回文件状态标志;失败返回-1,并且设置errno
F_SETFL 返回0
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
void SetBlockStatus(int fd) //设置文件为非阻塞
{
    int fc = fcntl(fd, F_GETFL);
    if (fc < 0)
    {
        return;
    }
    fcntl(fd, F_SETFL, fc | O_NONBLOCK);
}
int main()
{
    SetBlockStatus(0);
    while (1)
    {
        char buffer[1024];
        ssize_t s = read(0, buffer, sizeof(buffer) - 1);
        if (s > 0)
        {
            buffer[s] = 0;
            write(1, buffer, strlen(buffer));
        }else{
            printf("failed error : %d\n", errno); // EAGAIN || EWOULDBLOCK
        }
        sleep(1);
    }
    return 0;
}
 2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 3.多路转接之select
- 一次可以等待多个fd,提高io效率,相比于多线程来说减少了调度消耗的资源
 - 缺点:
- 需要用户端保存文件描述符,并且一次select后要使用循环检测并重新设值
 - fd_set对应的文件描述符有上限
 - 系统底层需要轮询检测fd_set上的就绪态
 - 存在用户和内核间的频繁拷贝
 
 
#include <sys/select.h> /* According to earlier standards */ #include <sys/time.h> #include <sys/types.h> #include <unistd.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); // 设置fd_set的函数 void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set);1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
nfds:代表最大文件描述符数值+1
fd_set:位图,一个比特位代表一个sock,最大支持1024个文件
readfds: 只关心读 ; writefds: 只关心写 ; exceptfds: 只关心异常。这些是输入输出参数,一方面用户告知内核需要等待的文件fd,另一方面当文件就绪时内核告知用户就绪的文件fd。由于输出时会将fd_set数据置位,因此需要重新传入,需要应用层保存数据。
timeout:就绪立即返回;不就绪就等待timeout时间,时间到立即返回。输入的用户设定的时间,输出的是等待后剩余的时间。若值为NULL,代表阻塞等;设为0,非阻塞,轮询检测;
struct timeval { long tv_sec; /* seconds */ long tv_usec; /* microseconds */ };1
2
3
4
- 返回值:失败返回-1,并且设置errno。成功返回三个位图中就绪文件描述符的个数,如果timeout释放有可能导致结果为0。
 
# 代码示例
- Socket.hpp
 
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <cstring>
#include <netinet/in.h>
#include <arpa/inet.h>
class Sock{
public:
    static int InitSocket() {
        int _sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if(_sockfd == -1) {
            std::cerr << "Create Socket Failed" << std::endl;
            exit(-1); 
        }
        //设置服务器处于TIME_WAIT状态也能建立连接
        int opt = 1;
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); 
        return _sockfd;
    }
    static void Bind(int sock, uint16_t port) {
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;
        if(bind(sock, (sockaddr*)&local, sizeof(local)) == -1) {
            std::cerr << "Bind Failed" << std::endl;
            exit(-1);
        }
    }
    static void Listen(int sock) {
        if(listen(sock, 5) == -1) {
            std::cerr << "Bind Failed" << std::endl;
            exit(-1);
        }
    }
    static int Accept(int sock) {
        struct sockaddr_in remote;
        socklen_t len = sizeof(remote);
        int fd = accept(sock, (sockaddr*)&remote, &len);
        return fd;
    }
    static void Connect(int sock, std::string ip, uint16_t port) {
        struct sockaddr_in remote;
        memset(&remote, 0, sizeof(remote));
        remote.sin_family = AF_INET;
        remote.sin_port = htons(port);
        remote.sin_addr.s_addr = inet_addr(ip.c_str());
        if(connect(sock, (struct sockaddr*)&remote, sizeof(remote)) == 0) {
            std::cout << "connect success" << std::endl;
        }else{
            std::cout << "connect failed" << std::endl;
            exit(-1);
        }
    }
};
 2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
- Select.cpp
 
#include "Socket.hpp"
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>
#include <vector>
#define m_port 8080
#define fSetNUM 1024
int main() {
    std::vector<int> fd_array(fSetNUM, -1); // 记录需要等待的套接字
    int listen_fd = Sock::InitSocket();
    Sock::Bind(listen_fd, m_port);
    Sock::Listen(listen_fd);
    // int in_sock = Sock::Accept(listen_fd);  // 不在此阻塞等待,accept只用来拷贝数据
    fd_set readBitset;
    while(true) {
        fd_array[0] = listen_fd;
        int max_fd = listen_fd;
        FD_ZERO(&readBitset);
        for(int i = 0; i < fSetNUM; i++) {
            if(fd_array[i] == -1) continue; //存在监听失败的套接字
            FD_SET(fd_array[i], &readBitset);
            max_fd = std::max(max_fd, fd_array[i]); // 得到最大的fd
        }
        struct timeval timeout = {3,0};
        int OKNum = select(max_fd + 1, &readBitset, nullptr, nullptr, &timeout);
        if(OKNum == -1) {
            std::cout << "select wait failed" << std::endl;
        }else if(OKNum == 0) {
            std::cout << "select timeout" << std::endl;
        }else{
            // 套接字有事件就绪,不处理会一直通知。
            for(int i = 0; i < fSetNUM; i++) {
                if(fd_array[i] == -1) continue; //存在监听失败的套接字
                if(FD_ISSET(fd_array[i], &readBitset)) {
                    //读事件就绪,需要得到就绪事件的fd, 通过readBitset返回用户
                    if(fd_array[i] == listen_fd) {
                        //新连接来了,添加新的套接字
                        int sock = Sock::Accept(listen_fd);
                        int j = 1;
                        for(; j < fSetNUM  ; j++) {
                            if(fd_array[j] == -1) break;
                        }
                        if(j < fSetNUM) fd_array[j] = sock;
                        else {
                            close(sock); // 维护的文件描述符超出了select的承载范围  
                        }
                    }else{
                        //普通连接的套接字就绪
                        //read,recv——不会阻塞等待了。但需要处理粘包问题
                        char Buff[1024] = {0};
                        ssize_t s = recv(fd_array[i], Buff, sizeof(Buff) - 1, 0);
                        if(s > 0) {
                            Buff[s] = 0;
                            std::cout << "client[" << fd_array[i] << "] :" << Buff << std::endl;  
                        }else if(s == 0){  // 对端连接关闭了
                            close(fd_array[i]);
                            fd_array[i] = -1;
                        }else{
                            close(fd_array[i]);
                            fd_array[i] = -1;
                        }
                    }
                }
            }
        }
    }
    return 0;
}
 2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# 4.多路转接之poll
#include <poll.h> int poll(struct pollfd *fds, nfds_t nfds, int timeout)1
2
- struct pollfd:
 struct pollfd { int fd; /* file descriptor */ short events; /* 用户输入的 */ short revents; /* 系统返回的 */ };1
2
3
4
5
events POLLIN POLLPRI POLLOUT POLLRDHUP - - - 有读数据 有优先数据 写非阻塞(可写) 对方流套接字关闭 - - - revents POLLIN POLLPRI POLLOUT POLLRDHUP POLLERR POLLHUP POLLNVAL 如TCP带外数据 错误连接 挂起 无效请求 
- nfds:文件描述符数组的大小。若对应的fd为负数,相应的event被忽略且revents返回0
 - timeout:阻塞等待时间,单位毫秒。设置为0——非阻塞,即便没有文件描述符就绪也会立即返回;设为负数,永久阻塞。
 - 返回值:成功返回正数;失败返回-1;若timeout到达并且没有文件描述符就绪,返回0.
 
#include <poll.h>
#include <unistd.h>
#include <iostream>
int main() {
    struct pollfd fdSet;
    fdSet.fd = 0; //标准输入
    fdSet.events = POLLIN;
    fdSet.revents = 0;
    while(true) {
        int ret = poll(&fdSet, 1, -1);
        if(ret == 0) {
            std::cout << "time out"  << std::endl;
        }else if(ret == -1) {
            std::cout << "error" << std::endl;
        }else{
            if(fdSet.revents & POLLIN) { //读事件就绪
                char buff[1024] = {0};
                ssize_t s = read(fdSet.fd, buff, sizeof(buff) -1);
                if(s > 0) {
                    std::cout << buff << std::endl;
                }
            } 
        }
    }
    
    return 0;
}
 2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 5.多路转接之epoll
# 函数接口
#include <sys/epoll.h> int epoll_create(int size);1
2
size :代表内核会为多少文件描述符开空间,但是在新内核中无效,必须大于0以兼容旧系统。
返回值:成功返回非负的文件描述符,需要close;失败返回-1
#include <sys/epoll.h> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);1
2
- epfd:
 epoll_create创建的文件描述符,指向一个 epoll instance- op: 用户设置系统对文件描述符fd的操作类型:
 EPOLL_CTL_ADD: 增加fd到epfd;EPOLL_CTL_MOD: 修改fd的event类型;EPOLL_CTL_DEL:从epfd删除fd。- fd:需要等待的文件描述符
 - event:
 typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };1
2
3
4
5
6
7
8
9
10
11其中uint32_t events可选:
EPOLLIN:关联文件可用于读取操作。 EPOLLOUT:相关文件可用于写入(2)操作。只要用户利用epoll_ctl设置,就会触发一次写就绪 EPOLLRDHUP (since Linux 2.6.17):流套接字对等端关闭连接,或关闭写入连接的一半。(此标志对于编写简单代码以检测对等机关闭特别有用,当使用边缘触发监控时。) EPOLLPRI:存在可用于读取(2)操作的紧急数据。 EPOLLERR:关联的文件描述符出现错误情况。epoll_ wait(2)将始终等待此事件;没有必要在事件中设置它。 EPOLLHUP:关联的文件描述符发生挂起。epoll_ wait(2)将始终等待此事件;没有必要在事件中设置它。 EPOLLET:设置关联文件描述符的边缘触发行为。epoll的默认行为是水平触发的。 EPOLLONESHOT (since Linux 2.6.2):设置关联文件描述符的一次性行为。这意味着在使用epoll_wait(2)拉出事件后,关联的文件描述符在内部被禁用,epoll接口不会报告其他事件。用户必须使用EPOLL_CTL_MOD 。1
2
3
4
5
6
7
8
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);1
2
- events:输出型参数,系统告诉用户哪些文件描述符对应的时间就绪了。
 - maxevents:输出型参数
 - 返回值:返回就绪的文件描述符的个数,如果在timeout时间内没有任何文件就绪就返回0;错误返回-1.
 
#include <sys/epoll.h>
#include "Socket.hpp"
#include <unistd.h>
#define port 8080
#define NUM 32
 
int main() {
    int epfd = epoll_create(64);
    int listen_fd = Sock::InitSocket();
    Sock::Bind(listen_fd, port);
    Sock::Listen(listen_fd);
    // 将监听套接字添加到内核
    struct epoll_event evt;
    evt.events = EPOLLIN;
    evt.data.fd = listen_fd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &evt);
    struct epoll_event revt[NUM]; // 保存就绪的事件
    // 事件循环
    while(true) {
        int OKNum = epoll_wait(epfd, revt, NUM, 1000); //获取就绪列表
        if(OKNum == 0) {
            std::cout << "Time Out" << std::endl;
        }else if(OKNum == -1) {
            std::cout << "epoll error" << std::endl;
        }else{
            for(int i = 0; i < OKNum; i++) {
                int OKfd = revt[i].data.fd;
                if(revt[i].events & EPOLLIN) {  // 读事件就绪
                    if(OKfd == listen_fd) {
                        int newfd = Sock::Accept(listen_fd);
                        if(newfd == -1) {
                            std::cout << "Accept New FD failed" << std::endl;
                            continue;
                        } 
                        struct epoll_event newEvt;
                        newEvt.events = EPOLLIN;
                        newEvt.data.fd = newfd;
                        epoll_ctl(epfd, EPOLL_CTL_ADD, newfd, &newEvt);
                    }else { //其他文件描述符
                        char buff[1024];
                        ssize_t s = recv(OKfd, buff, sizeof(buff) - 1, 0);
                        if(s == 0) {
                            std::cout << "remote client quit" << std::endl;
                            close(OKfd);
                            epoll_ctl(epfd, EPOLL_CTL_DEL, OKfd, nullptr); //删除epoll fd
                        }else if(s > 0) {
                            buff[s] = 0;
                            std::cout << "client[" << OKfd << "]: " <<  buff << std::endl;
                        }else {
                            std::cout << "" << std::endl;
                            close(OKfd);
                            epoll_ctl(epfd, EPOLL_CTL_DEL, OKfd, nullptr); 
                        }
                    }
                }else if(revt[i].events & EPOLLOUT) { //写事件就绪
                    // ,,,
                }
            }
        }
    }
    return 0;
}
 2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# epoll原理
epoll_create创建了一个红黑树(维护一些文件描述符)、等待就绪队列、回调函数。
epoll_ctl用来添加红黑树的一个节点,并对相应的描述符设置关心的事件,系统根据设置的事件添加相应的回调机制。
epoll_wait获取就绪队列中就绪的事件。
硬件将数据拷贝到内核会触发中断,此时执行回调函数,将拷贝到内核的数据添加到就绪队列中,等待上层应用调用。
# epoll的工作方式
- ET(边缘触发)模式——epoll设置EPOLLET
 
ET通知一次,为了保证数据能被读完,则会要求程序员循环读取。若阻塞读取,前一次读取刚好读取完,则最后一次读取会被卡住,所有ET模式下读取要设置成非阻塞读取。
- LT(水平触发)模式:只要有数据会一直通知。——select、poll、epoll的默认模式
 
