IO 多路转接:Reactor 框架与 Epoll 机制的封装设计
深入讲解基于 Epoll 与 Reactor 模式的高并发网络服务器设计与实现。内容涵盖水平触发与边缘触发模式差异,Socket 与 Epoll 接口封装,连接管理对象设计,事件循环任务派发流程。重点解析非阻塞 IO 配置、读写缓冲区处理、回调机制绑定,以及引入线程池时解决 shared_ptr 循环引用的 weak_ptr 方案。

深入讲解基于 Epoll 与 Reactor 模式的高并发网络服务器设计与实现。内容涵盖水平触发与边缘触发模式差异,Socket 与 Epoll 接口封装,连接管理对象设计,事件循环任务派发流程。重点解析非阻塞 IO 配置、读写缓冲区处理、回调机制绑定,以及引入线程池时解决 shared_ptr 循环引用的 weak_ptr 方案。

在高并发成为系统标配的今天,网络编程、中间件开发、分布式通信等场景中,如何高效处理海量 IO 请求始终是开发者绕不开的核心命题。传统一连接一线程的同步阻塞模型,因线程资源耗尽、CPU 上下文切换频繁、内存占用过高等问题,难以应对万级甚至十万级的并发连接;即便引入线程池优化,也无法从根本上解决等待 IO 时线程闲置的资源浪费困境。
正是在这样的需求下,基于事件驱动与 IO 多路转接的 Reactor 模式应运而生。它以少量线程监听多 IO、事件触发业务处理的核心逻辑,成为解决高并发 IO 的经典架构。小到 Netty 的网络通信内核、Redis 的事件循环,大到 Nginx 的请求处理框架、Kafka 的消息接收模块,其底层都能看到 Reactor 模式的影子。理解 Reactor 模式的实现逻辑,是掌握高并发系统设计的关键钥匙。
本文围绕 Reactor 模式实现展开:不局限于抽象原理,而是从底层技术依赖(IO 多路转接调用)切入,一步步拆解事件循环的构建、组件间的协作逻辑,帮助你构建出一个基于 Reactor 模式的服务器。
Epoll 有两种工作模式:水平触发(Level Triggered,简称 LT)和边缘触发(Edge Triggered,简称 ET)。这两种模式的核心差异在于何时通知应用程序某个文件描述符(fd)就绪,直接影响高并发 IO 处理的效率和编程复杂度。
epoll 仅在文件描述符的就绪状态发生变化瞬间通知一次,之后无论该状态是否持续,都不再通知。即只有在读写资源从没就绪到就绪的时候才会进行通知。我们通常会认为 ET 模式的效率更高:
下面我们开始进行基于 Reactor 模式设计的高性能网络服务器,通过事件驱动和 IO 多路转接技术,高效处理海量并发连接。
关于网络套接字的细节并非本文重点,此处直接展示实现代码:
const std::string defaultip_ = "0.0.0.0";
enum SockErr { SOCKET_Err, BIND_Err };
class Sock {
public:
Sock(uint16_t port) : port_(port), listensockfd_(-1) {}
void Socket() {
listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);
if (listensockfd_ < 0) {
Log(Fatal) << "socket fail";
exit(SOCKET_Err);
}
Log(Info) << "socket success";
}
void Bind() {
struct sockaddr_in server;
server.sin_family = AF_INET;
server.sin_port = htons(port_);
inet_pton(AF_INET, defaultip_.c_str(), &server.sin_addr);
if (bind(listensockfd_, (struct sockaddr*)&server, sizeof(server)) < 0) {
Log(Fatal) << "bind fail";
exit(BIND_Err);
}
Log(Info) << "bind success";
}
void Listen() {
if (listen(listensockfd_, 10) < 0) {
Log(Warning) << "listen fail";
}
Log(Info) << "listen success";
}
int Accept() {
struct sockaddr_in client;
socklen_t len = sizeof(client);
int fd = accept(listensockfd_, (sockaddr*)&client, &len);
return fd;
}
int Accept(std::string& ip, uint16_t& port) {
struct sockaddr_in client;
socklen_t len = sizeof(client);
int fd = accept(listensockfd_, (sockaddr*)&client, &len);
port = ntohs(client.sin_port);
char bufferip[64];
inet_ntop(AF_INET, &client.sin_addr, bufferip, sizeof(bufferip) - 1);
ip = bufferip;
return fd;
}
int Get_fd() { return listensockfd_; }
~Sock() { close(listensockfd_); }
private:
uint16_t port_;
int listensockfd_;
};
关于 Epoll 具体的细节可参考相关文档,此处直接展示封装后的接口使用:
enum EpollErr { CREAR_Err };
class Epoll {
public:
Epoll() {
_epfd = epoll_create(1);
if (_epfd < 0) {
Log(Fatal) << "epoll_create fail";
exit(CREAR_Err);
}
Log(Info) << "epoll create success";
}
void Add_fd(int fd, uint32_t event) {
struct epoll_event epevt;
epevt.events = event;
epevt.data.fd = fd;
if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &epevt) < 0) {
Log(Warning) << "epoll add error : " << strerror(errno);
}
Log(Info) << "epoll add success , fd : " << fd;
}
void Del_fd(int fd) {
if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr) < 0) {
Log(Warning) << "epoll del error : " << strerror(errno);
}
Log(Info) << "epoll del success , fd : " << fd;
}
void Mod_fd(int fd, event) {
epevt;
epevt.events = event;
epevt.data.fd = fd;
((_epfd, EPOLL_CTL_MOD, fd, &epevt) < ) {
(Warning) << << (errno);
}
}
{
(_epfd, ep_array, max_size, timeout);
}
:
_epfd;
};
因为 TCP 通信传递的是字节流,无法确定每次获取到的数据是一个有效报文,因此需要将所有获取到的数据先存储起来:
std::shared_ptr<Connection>,保证跳转到外界执行代码时依旧可以拿到文件描述符的相关资源。class Connection;
using func_t = std::function<void(std::shared_ptr<Connection>)>;
class Connection {
public:
Connection(int fd, func_t recv, func_t sender, func_t exception)
: _fd(fd), _Recv(recv), _Sender(sender), _Exception(exception) {}
private:
int _fd; // 对应的文件描述符
std::string _inbuffer; // 输入缓冲区
std::string _outbuffer; // 输出缓冲区
public:
func_t _Recv; // 处理接收的逻辑
func_t _Sender; // 处理发送的逻辑
func_t _Exception; // 处理出现异常时的逻辑
};
在该类中,后续需要先缓冲区中进行读写操作:
std::string& Get_Inbuffer() { return _inbuffer; }
std::string& Get_Outbuffer() { return _outbuffer; }
void Add_In(const std::string& mes) { _inbuffer += mes; }
void Add_Out(const std::string& mes) { _outbuffer += mes; }
int Get_fd() { return _fd; }
可能后续还需要使用一些操作,在后面再进行补充。
class Rserver {
static const int array_num_max = 1024;
public:
Rserver(uint16_t port) : _sock_ptr(new Sock(port)), _epoll_ptr(new Epoll) {}
private:
std::shared_ptr<Sock> _sock_ptr;
std::shared_ptr<Epoll> _epoll_ptr;
std::unordered_map<int, std::shared_ptr<Connection>> _connections;
struct epoll_event _epl_array[array_num_max];
};
在 ET 模式下,我们要保证一次将所有的资源都获取上来,因此我们需要 while 式的对资源进行读取,这就使得如果没有资源了我们也不能让其堵塞住,因此要将所有文件描述符设置为非阻塞状态。
此时使用 int fcntl(int fd, int op, ...) 接口进行设置:
int SetNoBlock(int fd) {
int fl = fcntl(fd, F_GETFL);
fl |= O_NONBLOCK;
int n = fcntl(fd, F_SETFL, fl);
return n;
}
首先就是普通文件的接收方法:
对于第二步,我们可以先向外界开放一个接口,让外界将数据进行处理,将处理好的数据再给我,由服务器进行发送,因此我们在服务端的类中添加一个成员,负责回调:
using callback_func = std::function<std::string(std::shared_ptr<Connection>)>;
class Rserver {
static const int array_num_max = 1024;
public:
Rserver(uint16_t port, callback_func Onmessage)
: _sock_ptr(new Sock(port)), _epoll_ptr(new Epoll), _Onmessage(Onmessage) {}
private:
std::shared_ptr<Sock> _sock_ptr;
std::shared_ptr<Epoll> _epoll_ptr;
std::unordered_map<int, std::shared_ptr<Connection>> _connections;
struct epoll_event _epl_array[array_num_max];
callback_func _Onmessage; // 负责回调
};
关于普通文件描述符的接收问题,需要注意的就是 read 的不同返回值进行不同的处理:
void Recv(std::shared_ptr<Connection> con_ptr) {
// 1. 将缓冲区中的数据全部读取到 Connection 中
// 2. 调用外界函数判断是否含有一个完整的报文
// 3. 先客户端返回结果
char inbuffer[1024];
while (1) {
int n = read(con_ptr->Get_fd(), inbuffer, sizeof(inbuffer) - 1);
if (n > 0) {
// 有数据
inbuffer[n] = 0;
con_ptr->Add_In(inbuffer);
} else if (n == 0) {
// 对方关闭了文件,断开连接了
// 1. 将文件描述符从 epoll 模型中移除
// 2. 将文件描述符从哈希表中移除
// 3. 将文件描述符关闭
int fd = con_ptr->Get_fd();
_epoll_ptr->Del_fd(fd);
_connections.erase(fd);
close(fd);
return;
} else {
// 此次有两种情况:1. 数据读取完了 2. 读取出错了
if (errno == EAGAIN) // 读取完了
break;
else // 出错了
{
// 此处调用文件对应的异常处理
con_ptr->_Exception(con_ptr);
return;
}
}
}
std::string ret = _Onmessage(con_ptr);
con_ptr->Add_Out(ret);
}
接下来就是编写发送的接口:
思考:对于发送接口是否需要判断,写事件是否就绪???
在大多数时候,写事件都是就绪的;因此如果将其加入到判断中 epoll_wait 就会频繁的返回,会影响效率;所以一般不对写事件加入到等待中,除非写缓冲区满了,此时才将写加入到等待中。
void Sender(std::shared_ptr<Connection> con_ptr) {
// 进行数据的发送
// 直接进行发送
std::string& outbuffer = con_ptr->Get_Outbuffer();
int fd = con_ptr->Get_fd();
// 循环式的进行发送
while (1) {
int n = write(fd, outbuffer.c_str(), outbuffer.size());
if (n > 0) {
// 1. 将已经发送的数据从字符串中移除
outbuffer.erase(0, n);
if (outbuffer.empty()) break; // 已经写完了
} else if (n == 0) {
break;
} else {
if (errno == EAGAIN) // 已经写完了
break;
else // 出错了
{
// 此处调用文件对应的异常处理
con_ptr->_Exception(con_ptr);
return;
}
}
}
// 判断发送缓冲区中是否还有数据
if (!outbuffer.empty()) {
// 发送缓冲区满了
_epoll_ptr->Mod_fd(fd, EPOLLIN | EPOLLOUT | EPOLLET);
} else {
// 缓冲区没满,不需要对写事件进行检测
_epoll_ptr->Mod_fd(fd, EPOLLIN | EPOLLET);
}
}
最后一步就是对异常情况的处理了:
void Exception(std::shared_ptr<Connection> con_ptr) {
int fd = con_ptr->Get_fd();
_epoll_ptr->Del_fd(fd);
_connections.erase(fd);
close(fd);
}
对于套接字来说,只需要负责将建立好的链接拿上来就行了,不需要进行写入和异常处理。
在创建为新的文件描述符创建 Connection 对象的时候,我们需要传入可执行对象,但是我们在进行统一接口的时候参数都是 std::shared_ptr<Connection>,并且上述的 Recv, Sender, Expection 都是类成员函数,都有一个隐含的参数 this 指针,所以对于可调用对象在进行传参的是否要使用 bind 进行绑定。
void Accept(std::shared_ptr<Connection> con_ptr) {
// 1. 获取文件描述符
while (1) {
int newfd = _sock_ptr->Accept();
if (newfd >= 0) {
// 有新连接
// 2. 将文件描述符设置为非阻塞
// 3. 将文件加入到 epoll 模型中
// 4. 将文件描述符加入到哈希表中
if (SetNoBlock(newfd) < 0) {
Log(Warning) << "set no block fail";
continue;
}
_epoll_ptr->Add_fd(newfd, EPOLLIN | EPOLLET);
std::shared_ptr<Connection> con_ptr(new Connection(
newfd,
std::bind(&Rserver::Recv, this, std::placeholders::_1),
std::bind(&Rserver::Sender, this, std::placeholders::_1),
std::bind(&Rserver::Exception, this, std::placeholders::_1)
));
_connections.emplace(newfd, con_ptr);
} else {
if (errno == EAGAIN)
break;
else {
// 出错了
Log(Warning) << "accept fail";
}
}
}
}
void Init() {
// 1. 创建套接字
// 2. 进行绑定
// 3. 设置监听模式
// 4. 将网络套接字加入到 epoll 模型中,并创建 Connection 加入到_connections 中进行管理
_sock_ptr->Socket();
_sock_ptr->Bind();
_sock_ptr->Listen();
int listensock = _sock_ptr->Get_fd();
SetNoBlock(listensock);
_epoll_ptr->Add_fd(listensock, EPOLLIN | EPOLLET);
std::shared_ptr<Connection> conptr(new Connection(
listensock,
std::bind(&Rserver::Accept, this, std::placeholders::_1),
nullptr,
nullptr
));
_connections.emplace(listensock, conptr);
// 将 IP 和端口号设置为可复用的
int opt = 1;
setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
}
因为我们之前已经将每个文件描述符对应的处理方法加入到了 Connection 对象中了,因此直接进行调用即可。
在进行任务派发的时候有一个细节:可以将异常处理嫁接到读写事件中的异常处理,这样就不需要再单独对异常进行处理了。
void Dispatcher(int n) {
for (int i = 0; i < n; i++) {
int fd = _epl_array[i].data.fd;
short events = _epl_array[i].events;
auto& con_ptr = _connections[fd];
// 将异常处理,转化为读写处理
if (events & EPOLLERR) {
events |= (EPOLLIN | EPOLLOUT);
}
if (_connections.count(fd) && con_ptr->_Recv) {
con_ptr->_Recv(con_ptr);
}
if (_connections.count(fd) && con_ptr->_Sender) {
con_ptr->_Sender(con_ptr);
}
}
}
服务器的主循环就比较简单了,直接进行 epoll_wait 即可,将操作系统中的就绪队列拿到:
void Run() {
while (1) {
int n = _epoll_ptr->Wait(_epl_array, array_num_max, -1);
if (n > 0) {
Dispatcher(n);
} else if (n == 0) {
Log(Info) << "no message";
} else {
Log(Warning) << "epoll wait fail";
}
}
}
以上就是整个服务器的实现过程了,下面我们对服务器接入一下事件,让服务器能够处理一些业务。
此处我们引入手动实现序列化和反序列化的代码,来实现一个在线计算器:
std::string Onmessage(std::shared_ptr<Connection> con_ptr) {
static Calculator cal;
std::string& inbuffer = con_ptr->Get_Inbuffer();
std::string ret = cal(inbuffer); // 对请求进行处理,返回一个序列化后的字符串
return ret;
}
对于引入线程池,此代码就需要进行重构了。在 Connection 对象中我们需要存储一个 Server 的回指指针,但是此处不能直接使用 shared_ptr<> 否则会出现循环引用,因此要采用 weak_ptr 来实现。
但是注意:我们是在类的成员函数中使用其 this 指针来构建一个 shared_ptr,从而初始化 weak_ptr;
如果在类的成员函数中,直接通过 this 指针创建新的 shared_ptr,会导致两个独立的 shared_ptr 管理同一个对象,但它们的引用计数是分开的:
此处我们需要使用 enable_shared_from_this 继承来进行解决:
此时,通过调用 shared_from_this() 方法,可返回一个指向自身的 shared_ptr,这个新的 shared_ptr 会复用原有的引用计数,避免双重释放。
服务器类定义:
class Rserver : public std::enable_shared_from_this<Rserver> {
public:
// ......
};
在 Connection 类中增加一个成员:weak_ptr _loop_svr.
对于创建 Connection 对象部分也要进行修改:
std::shared_ptr<Connection> conptr(new Connection(
listensock,
shared_from_this(),
std::bind(&Rserver::Accept, this, std::placeholders::_1),
nullptr,
nullptr
));
在第二个实参中,传入 this 指针来构建 Connection 中的 weak_ptr。
关于线程池部分的代码因为文章篇幅就不再叙述了,读者可在此基础上扩展线程池功能以进一步提升性能。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online