epoll&&webserver

epoll

I/O多路复用

I/O多路复用(I/O Multiplexing)是一种允许单个线程或进程同时监视多个文件描述符(通常是网络套接字)的可读、可写和异常等事件的技术。

IO多路复用——深入浅出理解select、poll、epoll的实现 - 知乎 (zhihu.com)

当然,我们在这里使用的是最现代,最高效的epoll系统调用。

在Linux系统中,当需要处理多个文件描述符时,epoll可以用来监控这些文件描述符的读写事件。当至少一个文件描述符准备好进行I/O操作时,epoll会通知应用程序,从而允许程序执行非阻塞的I/O操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* 
* epoll_data_t 是一个联合体,用于存储 epoll_event 结构体中的用户数据。
* 它可以存储指针 ptr、文件描述符 fd、32位整数 u32 或 64位整数 u64。
*/
typedef union epoll_data
{
void *ptr; /* 指针类型 */
int fd; /* 文件描述符 */
uint32_t u32; /* 32位无符号整数 */
uint64_t u64; /* 64位无符号整数 */
} epoll_data_t;

/*
* epoll_event 结构体定义了一个 epoll 事件。
* 它包含了事件 events 和用户数据 data。
*/
struct epoll_event
{
uint32_t events; /* Epoll events,用于指定感兴趣的事件类型 */
epoll_data_t data; /* User data variable,用于存储用户数据 */
} __EPOLL_PACKED;

epoll 相关的主要函数有:

  1. epoll_create()epoll_create1():创建一个 epoll 实例并返回一个文件描述符,用于后续的 epoll 操作。
  2. epoll_ctl():用于向 epoll 实例添加、修改或删除要监控的文件描述符及其事件。
  3. epoll_wait()epoll_pwait():等待 epoll 实例中指定的文件描述符上的一个或多个事件发生,返回发生事件的文件描述符及其事件类型。

epoll_ctl()的操作类型:

操作类型 描述
EPOLL_CTL_ADD 添加一个文件描述符到 epoll 实例中,并且关联一个事件掩码(event mask),用于监听特定的事件。
EPOLL_CTL_MOD 修改已经添加到 epoll 实例中的文件描述符的事件掩码。这可以用于改变监听的事件类型或更新兴趣列表。
EPOLL_CTL_DEL 从 epoll 实例中删除一个文件描述符,不再监听该文件描述符上的事件。

epoll 支持两种工作模式:

  1. 水平触发(Level Triggered, LT):当文件描述符就绪时,epoll_wait() 会通知处理程序。如果处理程序没有完全处理该文件描述符上的所有数据,下次调用 epoll_wait() 时,它还会再次通知同一个文件描述符。
  2. 边缘触发(Edge Triggered, ET):当文件描述符的状态发生变化时,epoll_wait() 会通知处理程序。处理程序必须处理完所有就绪的 I/O 操作,因为如果再次调用 epoll_wait(),它可能不会因为同一个文件描述符而再次被唤醒,除非又有新的 I/O 事件发生。

注意ET模式的文件描述符应该是非阻塞的,这是因为【Linux Socket C++】为什么IO复用需要用到非阻塞IO?EAGAIN的简单介绍与应用_c++ eagain-CSDN博客

我们这里使用效率更高的ET模式。

epoller.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#ifndef EPOLLER_H
#define EPOLLER_H

#include <cstdint> //无符号整数
#include <sys/epoll.h> //epoll
#include <fcntl.h> //一系列与文件描述符相关的函数,如 fcntl、open、close 等。
#include <unistd.h> //read、write、close、dup2
#include <assert.h>
#include <vector>
#include <errno.h>

class Epoller{
public:
explicit Epoller(int MaxEvent = 1024);

~Epoller();

bool AddFd(int fd, uint32_t events);

bool ModFd(int fd, uint32_t events);

bool DelFd(int fd);

int Wait(int timeoutMs = -1);

int GetEventFd(size_t i) const;

uint32_t GetEvents(size_t i) const;

private:
int epollFd_;

std::vector<struct epoll_event> events_;

};



#endif

epoller.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include "epoller.h"

Epoller::Epoller(int MaxEvent )
:epollFd_(epoll_create(MaxEvent)), events_(MaxEvent)
{
assert(epollFd_ >= 0 && events_.size() > 0);
}

Epoller::~Epoller()
{
close(epollFd_);
}

bool Epoller::AddFd(int fd, uint32_t events)
{
if(fd<0) return false;
epoll_event ev = {0};
ev.events = events;
ev.data.fd = fd;
return 0 == epoll_ctl(epollFd_,EPOLL_CTL_ADD,fd,&ev);

}

bool Epoller::ModFd(int fd, uint32_t events)
{
if(fd<0) return false;
epoll_event ev = {0};
ev.events = events;
ev.data.fd = fd;
return 0 == epoll_ctl(epollFd_,EPOLL_CTL_MOD,fd,&ev);
}

bool Epoller::DelFd(int fd)
{
if(fd<0) return false;
epoll_event ev = {0};
//ev.data.fd = fd;
return 0 == epoll_ctl(epollFd_,EPOLL_CTL_DEL,fd,&ev);

}

int Epoller::Wait(int timeoutMs )
{
return epoll_wait(epollFd_, &events_[0], static_cast<int>(events_.size()), timeoutMs);
}

int Epoller::GetEventFd(size_t i) const
{
assert(i < events_.size() && i >= 0);
return events_[i].data.fd;
}

uint32_t Epoller::GetEvents(size_t i) const
{
assert(i < events_.size() && i >= 0);
return events_[i].events;
}

webserver

到达webserver最高层webserver!这里我们主要是将将各个类结合起来。

socket 网络编程——端口复用技术(setsockopt())(linux下多个进程监听同一个端口)_linux setsockopt-CSDN博客

Reactor模式与Proactor模式:

两种高效的事件处理模式:Reactor模式和Proactor模式_proactor模式和reactor模式应用场景-CSDN博客

需要处理大量并发连接的网络服务器中,Reactor模式可能是更好的选择,因为它可以更有效地利用系统资源。而在需要处理大量I/O操作的应用中,Proactor模式可能更适合,因为它可以简化I/O操作的处理。

由于我们搭建的是tinywebserver,因此这里我们使用的是Reactor模式。

//TODO

webserver.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76


#ifndef WEBSERVER_H
#define WEBSERVER_H

#include <arpa/inet.h> //ip转换
#include <assert.h>
#include <cstdint>
#include <errno.h>
#include <fcntl.h> //文件控制
#include <memory>
#include <netinet/in.h> //互联网协议家族的定义
#include <sys/socket.h> //socket
#include <unistd.h> //POSIX操作系统API的访问
#include <unordered_map>

#include "epoller.h"
#include "../log/log.h"
#include "../timer/heaptimer.h"
#include "../pool/sqlconnpool.h"
#include "../pool/threadpool.h"
#include "../pool/sqlconnRAII.h"
#include "../http/httpconn.h"


class WebServer{
public:
WebServer(
int port, int trigMode, int timeoutMS, bool OptLinger,
int sqlPort, const char* sqlUser, const char* sqlPwd,
const char* dbName, int connPoolNum, int threadNum,
bool openlog, int loglevel, int logQueSize);

~WebServer();

void Start();

private:
bool InitSocket_();
void InitEventMode_(int trigMode);
void AddClient_(int fd, sockaddr_in addr);

void DealListen_();
void DealWrite_(HttpConn* client);
void DealRead_(HttpConn* client);

void SendError_(int fd, const char* info);
void ExtentTime_(HttpConn* Client);
void CloseConn_(HttpConn* client);

void OnRead_(HttpConn* client);
void OnWrite_(HttpConn* client);
void OnProcess(HttpConn* client);

static const int MAX_FD = 65536;

static int SetFdNonblock(int fd);

int port_;
bool openLinger_;
int timeoutMS_;
bool isClose_;
int listenFd_;
char* srcDir_;

uint32_t listenEvent_;
uint32_t connEvent_;

std::unique_ptr<HeapTimer> timer_;
std::unique_ptr<ThreadPool> threadpool_;
std::unique_ptr<Epoller> epoller_;
std::unordered_map<int, HttpConn> users_;
};


#endif

webserver.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#include "webserver.h"


WebServer::WebServer(
int port, int trigMode, int timeoutMS, bool OptLinger,
int sqlPort, const char* sqlUser, const char* sqlPwd,
const char* dbName, int connPoolNum, int threadNum,
bool openLog, int logLevel, int logQueSize)
:port_(port), openLinger_(OptLinger), timeoutMS_(timeoutMS), isClose_(false),
timer_(new HeapTimer()), threadpool_(new ThreadPool(threadNum)), epoller_(new Epoller())
{
srcDir_ = getcwd(nullptr, 256);
assert(srcDir_);
strncat(srcDir_, "/resources/", 16);
HttpConn::userCount = 0;
HttpConn::srcDir = srcDir_;

SqlConnPool::Instance()->Init("localhost", sqlPort, sqlUser, sqlPwd, dbName, connPoolNum);

InitEventMode_(trigMode);
if (!InitSocket_()) {
isClose_ = true;
}

if (openLog) {
Log::Instance()->init(logLevel, "./log", ".log", logQueSize);
if (isClose_) {
LOG_ERROR("========== Server init error!==========");
} else {
LOG_INFO("========== Server init ==========");
LOG_INFO("Port:%d, OpenLinger: %s", port_, OptLinger ? "true" : "false");
LOG_INFO("Listen Mode: %s, OpenConn Mode: %s",
(listenEvent_ & EPOLLET ? "ET" : "LT"),
(connEvent_ & EPOLLET ? "ET" : "LT"));
LOG_INFO("LogSys level: %d", logLevel);
LOG_INFO("srcDir: %s", HttpConn::srcDir);
LOG_INFO("SqlConnPool num: %d, ThreadPool num: %d", connPoolNum,
threadNum);
}
}
}

WebServer::~WebServer() {
close(listenFd_);
isClose_ = true;
free(srcDir_);
SqlConnPool::Instance()->ClosePool();
}

void WebServer::Start() {
int timeMS = -1;
if (!isClose_) {
LOG_INFO("========== Server start ==========");
}
while (!isClose_) {
if (timeoutMS_ > 0)
timeMS = timer_->GetNextTick();
int eventCnt = epoller_->Wait(timeMS);
for (int i = 0; i < eventCnt; i++) {
int fd = epoller_->GetEventFd(i);
uint32_t events = epoller_->GetEvents(i);
if (fd == listenFd_) {
DealListen_();
} else if (events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {
assert(users_.count(fd) > 0);
CloseConn_(&users_[fd]);
} else if (events & EPOLLIN) {
assert(users_.count(fd) > 0);
DealRead_(&users_[fd]);
} else if (events & EPOLLOUT) {
assert(users_.count(fd) > 0);
DealWrite_(&users_[fd]);
} else {
LOG_ERROR("Unexpected event");
}
}
}
}

bool WebServer::InitSocket_() {
int ret;
struct sockaddr_in addr;
if (port_ > 65535 || port_ < 1024) {
LOG_ERROR("Port:%d error!", port_);
return false;
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(port_);
struct linger optLinger = {0};
if (openLinger_) {
optLinger.l_onoff = 1; //启用linger
optLinger.l_linger = 1; //当关闭套接字时延时1s
}
listenFd_ = socket(AF_INET, SOCK_STREAM, 0);
// ipv4 流套接字 自动选择默认协议
if (listenFd_ < 0) {
LOG_ERROR("Create socket error!", port_);
return false;
}

//设置优雅关闭;
ret = setsockopt(listenFd_, SOL_SOCKET, SO_LINGER, &optLinger,
sizeof(optLinger));
if (ret < 0) {
close(listenFd_);
LOG_ERROR("Init linger error!", port_);
return false;
}

//端口复用
int optval = 1;
ret = setsockopt(listenFd_, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval,
sizeof(int));
if (ret == -1) {
LOG_ERROR("set socket setsockopt error !");
close(listenFd_);
return false;
}

ret = bind(listenFd_, (struct sockaddr *)&addr, sizeof(addr));
if (ret < 0) {
LOG_ERROR("Bind Port:%d error!", port_);
close(listenFd_);
return false;
}

ret = listen(listenFd_, 6);
if (ret < 0) {
LOG_ERROR("Listen port:%d error!", port_);
close(listenFd_);
return false;
}
ret = epoller_->AddFd(listenFd_, listenEvent_ | EPOLLIN);
if (ret == 0) {
LOG_ERROR("Add listen error!");
close(listenFd_);
return false;
}
SetFdNonblock(listenFd_);
LOG_INFO("Server port:%d", port_);
return true;
}
void WebServer::InitEventMode_(int trigMode) {
//初始化为水平触发模式
listenEvent_ = EPOLLRDHUP;
connEvent_ = EPOLLONESHOT | EPOLLRDHUP;
switch (trigMode) {
case 0:
break;
case 1:
connEvent_ |= EPOLLET;
break;
case 2:
listenEvent_ |= EPOLLET;
break;
case 3:
listenEvent_ |= EPOLLET;
connEvent_ |= EPOLLET;
break;
default:
listenEvent_ |= EPOLLET;
connEvent_ |= EPOLLET;
break;
}
HttpConn::isET = (connEvent_ & EPOLLET);
}

void WebServer::AddClient_(int fd, sockaddr_in addr) {
assert(fd > 0);
users_[fd].init(fd, addr);
if (timeoutMS_ > 0) {
timer_->add(fd, timeoutMS_,
std::bind(&WebServer::CloseConn_, this, &users_[fd]));
}
epoller_->AddFd(fd, EPOLLIN | connEvent_);
SetFdNonblock(fd);
LOG_INFO("Client[%d] in!", users_[fd].GetFd());
}

//接受连接
void WebServer::DealListen_() {
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
do {
int fd = accept(listenFd_, (struct sockaddr *)&addr, &len);
if (fd <= 0)
return;
else if (HttpConn::userCount >= MAX_FD) {
SendError_(fd, "Server busy!");
LOG_WARN("Clients is full!");
return;
}
AddClient_(fd, addr);
} while (listenEvent_ & EPOLLET);
}

void WebServer::DealWrite_(HttpConn *client) {
assert(client);
ExtentTime_(client);
threadpool_->AddTask(std::bind(&WebServer::OnWrite_, this, client));
}

void WebServer::DealRead_(HttpConn *client) {
assert(client);
ExtentTime_(client);
threadpool_->AddTask(std::bind(&WebServer::OnRead_, this, client));
}

void WebServer::SendError_(int fd, const char *info) {
assert(fd > 0);
int ret = send(fd, info, strlen(info), 0);
if (ret < 0) {
LOG_WARN("send error to client[%d] error!", fd);
}
close(fd);
}

void WebServer::ExtentTime_(HttpConn *client) {
assert(client);
if (timeoutMS_ > 0) {
timer_->adjust(client->GetFd(), timeoutMS_);
}
}

void WebServer::CloseConn_(HttpConn *client) {
assert(client);
LOG_INFO("Client[%d] quit!", client->GetFd());
epoller_->DelFd(client->GetFd());
client->Close();
}

void WebServer::OnRead_(HttpConn *client) {
assert(client);
int res = -1;
int readErrno = 0;
res = client->read(&readErrno);
if (res <= 0 && readErrno != EAGAIN) {
CloseConn_(client);
return;
}
OnProcess(client);
}

void WebServer::OnWrite_(HttpConn *client) {
assert(client);
int res = -1;
int writeErrno = 0;
res = client->write(&writeErrno);
if (client->ToWriteBytes() == 0) {
if (client->IsKeepAlive()) {
OnProcess(client);
return;
}
} else if (res < 0) {
if (writeErrno == EAGAIN) {
epoller_->AddFd(client->GetFd(), connEvent_ | EPOLLOUT);
return;
}
}

CloseConn_(client);
}

void WebServer::OnProcess(HttpConn *client) {
if (client->process()) {
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
} else {
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLIN);
}
}

int WebServer::SetFdNonblock(int fd) {
assert(fd > 0);
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFD, 0) | O_NONBLOCK);
}