Linux 高级 IO:基于 ET 模式 epoll 的 Reactor 模型 TCP 服务器实现
前言
本文介绍基于边缘触发(ET)模式的 epoll 机制构建 Reactor 反应堆模型的 TCP 服务器实现。文章将在水平触发(LT)模式的基础上进行讲解,重点阐述如何将 epoll 设置为 ET 模式并实现完整的 Reactor 架构。
一、前置知识
- epoll 工作模式:epoll 默认使用 LT(Level Triggered)水平触发模式,也可设置为 ET(Edge Triggered)边缘触发模式。设置方法是在
epoll_ctl 中将事件按位与上 EPOLLET。
- 核心文件结构:
Epoller.hpp:封装 epoll 接口(创建、等待、控制),管理 epfd。
Log.hpp:日志模块,定义全局对象 lg。
Main.cc:主函数入口,调用服务器逻辑。
makefile:编译构建脚本。
nocopy.hpp:禁止拷贝基类,确保服务器和 Epoller 单例特性。
Socket.hpp:封装套接字原生接口。
TcpServer.hpp:包含连接描述类 Connection 及服务器类 TcpServer。
- 建议:在阅读本文前,建议先了解 LT 模式下的 epoll TCP 服务器实现,因为大部分基础逻辑相同。
- 实现思路:通过
epoll_ctl 将 fd 关心的事件与 EPOLLET 组合传入,即可开启 ET 模式。
二、第一阶段,基本框架的实现
Connection 类设计
在 ET 模式下,文件描述符 sock 必须设置为非阻塞,并通过循环读取直到出错来清空接收缓冲区。由于 TCP 是面向字节流的,报文可能不完整,因此需要维护用户级输入缓冲区 inbuffer 和输出缓冲区 outbuffer。
- 成员变量:
_sock:文件描述符。
_inbuffer, _outbuffer:使用 std::string 类型,支持动态扩容。
_tcp_server_ptr:使用 std::weak_ptr<TcpServer> 打破循环引用。
_ip, _port:客户端信息。
- 回调函数:
- 定义
func_t = std::function<void(std::shared_ptr<Connection>)>。
_recv_cb, _send_cb, _except_cb:分别处理读、写、异常事件。
- 生命周期管理:
- 析构时关闭 socket。
- 避免
shared_ptr 循环引用,Connection 内部对 TcpServer 使用 weak_ptr。
#include <iostream>
#include <memory>
#include <functional>
#include <string>
#include <unordered_map>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Socket.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
class Connection;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
static const int g_buffer_size = 128;
using func_t = std::function<void(std::shared_ptr<Connection>)>;
class Connection {
public:
Connection(int sock, std::weak_ptr<TcpServer> tcp_server_ptr)
: _sock(sock), _tcp_server_ptr(tcp_server_ptr) {}
void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb) {
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
void AppendInBuffer(const std::string& info) { _inbuffer += info; }
void AppendOutBuffer(const std::string& info) { _outbuffer += info; }
std::string& InBuffer() { return _inbuffer; }
std::string& OutBuffer() { return _outbuffer; }
int SockFd() { return _sock; }
~Connection() {
if (_sock > 0) close(_sock);
}
private:
int _sock;
std::string _inbuffer;
std::string _outbuffer;
public:
func_t _recv_cb;
func_t _send_cb;
func_t _except_cb;
std::weak_ptr<TcpServer> _tcp_server_ptr;
std::string _ip;
uint16_t _port;
};
Main.cc
使用 shared_ptr 管理 TcpServer,调用 Init 和 Loop。
#include <iostream>
#include <memory>
#include "TcpServer.hpp"
int main() {
std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8080));
epoll_svr->Init();
epoll_svr->Loop();
return 0;
}
TcpServer 类实现
- 继承:
enable_shared_from_this<TcpServer> 以安全获取 shared_ptr,nocopy 防止拷贝。
- 成员变量:
_quit:运行标志。
_listensock_ptr, _epoller_ptr:智能指针管理资源。
_connections:哈希表管理所有连接。
_revs:epoll 就绪数组。
- 关键方法:
Loop():死循环执行 Dispatcher。
Dispatcher():检测事件,分发回调。
Init():初始化 Socket,绑定端口,监听,注册 listensock。
Accepter():循环 accept 获取新连接,注册到 epoll。
Recver/Sender/Excepter():读写及异常处理回调。
class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy {
static const int num = 64;
public:
TcpServer(uint16_t port)
: _quit(true), _port(port), _listensock_ptr(new Sock()), _epoller_ptr(new Epoller()) {}
void Init() {}
void Dispatcher(int timeout) {}
void Loop() {
_quit = false;
while (!_quit) {
Dispatcher(-1);
PrintConnection();
}
_quit = true;
}
void PrintConnection() {
std::cout << "_connections fd list: " << std::endl;
for (auto& connection : _connections) {
std::cout << connection.first << ", ";
std::cout << "inbuffer: " << connection.second->InBuffer() << std::endl;
}
std::cout << std::endl;
}
~TcpServer() {}
private:
bool _quit;
uint16_t _port;
std::shared_ptr<Sock> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
std::unordered_map<, std::shared_ptr<Connection>> _connections;
_revs[num];
};
辅助功能
- 非阻塞设置:使用
fcntl 设置 O_NONBLOCK。
- AddConnection:创建 Connection 对象,注册到哈希表和 epoll。
- EnableEvent:动态修改 epoll 关注的事件(如写事件)。
三、第二阶段,引入业务协议
业务层集成
引入网络版本计算器协议(Protocol.hpp, ServerCal.hpp)。TcpServer 新增 _OnMessage 回调,用于通知上层处理数据。
Recver 实现
循环读取数据至 _inbuffer,直到 EWOULDBLOCK 或错误。完成后调用 _OnMessage。
void Recver(std::shared_ptr<Connection> connection) {
int sock = connection->SockFd();
char buffer[g_buffer_size];
while (true) {
memset(buffer, 0, sizeof(buffer));
ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (n > 0) {
connection->AppendInBuffer(buffer);
} else if (n == 0) {
lg(Info, "sockfd: %d, client info -> %s:%d quit...", sock,
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
} else {
if (errno == EWOULDBLOCK) break;
else if (errno == EINTR) continue;
else {
lg(Warning, "sockfd: %d, client info -> %s:%d recv error...", sock,
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
_OnMessage(connection);
}
Sender 实现
循环发送 _outbuffer 数据,根据是否发送完毕动态调整写事件关注状态。
void Sender(std::shared_ptr<Connection> connection) {
int sock = connection->SockFd();
std::string& outbuffer = connection->OutBuffer();
while (true) {
ssize_t n = send(sock, outbuffer.c_str(), outbuffer.size(), 0);
if (n > 0) {
outbuffer.erase(0, n);
if (outbuffer.empty()) break;
} else if (n == 0) {
return;
} else {
if (errno == EWOULDBLOCK) break;
else if (errno == EINTR) continue;
else {
lg(Warning, "sockfd: %d, client info -> %s:%d send error...", sock,
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
if (!outbuffer.empty()) {
EnableEvent(sock, true, true);
} else {
EnableEvent(sock, true, false);
}
}
Excepter 实现
移除 epoll 关注,关闭 socket,从哈希表删除连接。
Makefile
需链接 jsoncpp 库。
.PHONY: all
all: reactor_client reactor_server
reactor_client: ClientCal.cc
g++ -o $@ $^ -std=c++11 -ljsoncpp
reactor_server: Main.cc
g++ -o $@ $^ -std=c++11 -ljsoncpp
.PHONY: clean
clean: rm -f reactor_client reactor_server
四、拓展
- 多线程模型:主线程 Reactor 负责 Accept,子线程 Reactor 负责 IO 处理,可接入线程池。
- 连接管理:引入定时器,使用最小堆管理连接过期时间,清理不活跃连接。
- 半同步半异步:真正的 Reactor 模型通常为主线程只负责等待(半同步),子线程负责处理(半异步),以提高并发能力。
六、源代码
Comm.hpp
#pragma once
#include <cstdlib>
#include <unistd.h>
#include <fcntl.h>
#include "Socket.hpp"
void SetNonBlockOrDie(int sock) {
int fl = fcntl(sock, F_GETFL);
if (fl < 0) exit(NON_BLOCK_ERR);
fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}
Epoller.hpp
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/epoll.h>
#include "Log.hpp"
#include "nocopy.hpp"
class Epoller : public nocopy {
static const int size = 128;
public:
Epoller() {
_epfd = epoll_create(size);
if (_epfd == -1) {
lg(Error, "epoll_create error: %s", strerror(errno));
} else {
lg(Info, "epoller_create success, epfd: %d", _epfd);
}
}
int EpollerWait(struct epoll_event revents[], int num, int timeout) {
int n = epoll_wait(_epfd, revents, num, timeout);
return n;
}
int EpollerUpdate(int oper, int sock, uint32_t event) {
int n = 0;
(oper == EPOLL_CTL_DEL) {
n = (_epfd, oper, sock, );
(n == ) (Error, );
} {
epoll_event ev;
ev.data.fd = sock;
ev.events = event;
n = (_epfd, oper, sock, &ev);
(n == ) (Error, );
}
n;
}
~() {
(_epfd >= ) (_epfd);
}
:
_epfd;
};
Log.hpp
#pragma once
#include <iostream>
#include <string>
#include <ctime>
#include <cstdio>
#include <cstdarg>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#define SIZE 1024
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define OneFile 2
#define ClassFile 3
#define LogFile "log.txt"
class Log {
public:
Log() { printMethod = Screen; path = "./log/"; }
void Enable(int method) { printMethod = method; }
~() {}
{
(level) {
Info: ;
Debug: ;
Warning: ;
Error: ;
Fatal: ;
: ;
}
}
{
t = ();
* ctime = (&t);
leftbuffer[SIZE];
(leftbuffer, (leftbuffer), ,
(level).(), ctime->tm_year + ,
ctime->tm_mon + , ctime->tm_mday, ctime->tm_hour,
ctime->tm_min, ctime->tm_sec);
va_list s;
(s, format);
rightbuffer[SIZE];
(rightbuffer, (rightbuffer), format, s);
(s);
logtxt[ * SIZE];
(logtxt, (logtxt), , leftbuffer, rightbuffer);
(level, logtxt);
}
{
(printMethod) {
Screen: std::cout << logtxt << std::endl; ;
OneFile: (LogFile, logtxt); ;
ClassFile: (level, logtxt); ;
: ;
}
}
{
std::string _logname = path + logname;
fd = (_logname.(), O_WRONLY | O_CREAT | O_APPEND, );
(fd < ) ;
(fd, logtxt.(), logtxt.());
(fd);
}
{
std::string filename = LogFile;
filename += ;
filename += (level);
(filename, logtxt);
}
:
printMethod;
std::string path;
};
Log lg;
Protocol.hpp
#pragma once
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
const std::string black_space_sep = " ";
const std::string protocol_sep = "\n";
std::string Encode(const std::string& content) {
std::string package = std::to_string(content.size());
package += protocol_sep;
package += content;
package += protocol_sep;
return package;
}
bool Decode(std::string& package, std::string* content) {
size_t pos = package.find(protocol_sep);
if (pos == std::string::npos) return false;
std::string len_str = package.substr(0, pos);
size_t len = std::stoi(len_str);
int total_len = len + 1 + len_str.size() + 1;
if (package.size() < total_len) return false;
*content += package.substr(pos + 1, len);
package.erase(0, total_len);
return true;
}
class Request {
:
( data1, data2, oper) : (data1), (data2), (oper) {}
() {}
{
std::string s = std::(x);
s += black_space_sep;
s += op;
s += black_space_sep;
s += std::(y);
*out = s;
;
Json::Value root;
root[] = x;
root[] = y;
root[] = op;
Json::StyledWriter w;
*out = w.(root);
;
}
{
left = in.(black_space_sep);
(left == std::string::npos) ;
std::string part_x = in.(, left);
right = in.(black_space_sep);
(right == std::string::npos) ;
std::string part_y = in.(right + );
(left + != right) ;
x = std::(part_x);
y = std::(part_y);
op = in[left + ];
;
Json::Value root;
Json::Reader r;
r.(in, root);
x = root[].();
y = root[].();
op = root[].();
;
}
{
std::cout << << x << op << y << << std::endl;
}
:
x;
y;
op;
};
{
:
( res, c) : (res), (c) {}
() {}
{
std::string s = std::(result);
s += black_space_sep;
s += std::(code);
*out = s;
;
Json::Value root;
root[] = result;
root[] = code;
Json::StyledWriter w;
*out = w.(root);
;
}
{
pos = in.(black_space_sep);
(pos == std::string::npos) ;
std::string part_left = in.(, pos);
std::string part_right = in.(pos + );
result = std::(part_left);
code = std::(part_right);
;
Json::Value root;
Json::Reader r;
r.(in, root);
result = root[].();
code = root[].();
;
}
{
std::cout << << result << << code << std::endl;
}
:
result;
code;
};
TcpServer.hpp
#include <iostream>
#include <memory>
#include <functional>
#include <string>
#include <unordered_map>
#include <cerrno>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "Log.hpp"
#include "Socket.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
#include "Comm.hpp"
class Connection;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
static const int g_buffer_size = 128;
using func_t = std::function<void(std::shared_ptr<Connection>)>;
{
:
( sock, std::weak_ptr<TcpServer> tcp_server_ptr)
: _sock(sock), _tcp_server_ptr(tcp_server_ptr) {}
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
{ _inbuffer += info; }
{ _outbuffer += info; }
{ _inbuffer; }
{ _outbuffer; }
{ _sock; }
~() {
(_sock > ) (_sock);
}
:
_sock;
std::string _inbuffer;
std::string _outbuffer;
:
_recv_cb;
_send_cb;
_except_cb;
std::weak_ptr<TcpServer> _tcp_server_ptr;
std::string _ip;
_port;
};
: std::enable_shared_from_this<TcpServer>, nocopy {
num = ;
:
( port, OnMessage)
: _quit(), _port(port), _listensock_ptr( ()),
_epoller_ptr( ()), _OnMessage(OnMessage) {}
{
_listensock_ptr->();
(_listensock_ptr->());
_listensock_ptr->(_port);
_listensock_ptr->();
(Info, , _listensock_ptr->());
(_listensock_ptr->(), EVENT_IN,
std::(&TcpServer::Accepter, , std::placeholders::_1),
, );
}
{
;
new_connection->(recv_cb, send_cb, except_cb);
new_connection->_ip = ip;
new_connection->_port = port;
_connections.(std::(sock, new_connection));
_epoller_ptr->(EPOLL_CTL_ADD, sock, events);
(Debug, , sock);
}
{
() {
peer;
len = (peer);
sock = ::(connection->(), ( sockaddr*)&peer, &len);
(sock >= ) {
port = (peer.sin_port);
ip[];
(AF_INET, &(peer.sin_addr), ip, (ip));
(Debug, , ip, port, sock);
(sock);
(sock, EVENT_IN,
std::(&TcpServer::Recver, , std::placeholders::_1),
std::(&TcpServer::Sender, , std::placeholders::_1),
std::(&TcpServer::Excepter, , std::placeholders::_1),
ip, port);
} {
(errno == EWOULDBLOCK) ;
(errno == EINTR) ;
;
}
}
}
{
sock = connection->();
buffer[g_buffer_size];
() {
(buffer, , (buffer));
n = (sock, buffer, (buffer) - , );
(n > ) {
connection->(buffer);
} (n == ) {
(Info, , sock,
connection->_ip.(), connection->_port);
connection->_except_cb(connection);
;
} {
(errno == EWOULDBLOCK) ;
(errno == EINTR) ;
{
(Warning, , sock,
connection->_ip.(), connection->_port);
connection->_except_cb(connection);
;
}
}
}
_OnMessage(connection);
}
{
sock = connection->();
std::string& outbuffer = connection->();
() {
n = (sock, outbuffer.(), outbuffer.(), );
(n > ) {
outbuffer.(, n);
(outbuffer.()) ;
} (n == ) {
;
} {
(errno == EWOULDBLOCK) ;
(errno == EINTR) ;
{
(Warning, , sock,
connection->_ip.(), connection->_port);
connection->_except_cb(connection);
;
}
}
}
(!outbuffer.()) {
(sock, , );
} {
(sock, , );
}
}
{
fd = connection->();
(Warning, ,
fd, connection->_ip.(), connection->_port);
_epoller_ptr->(EPOLL_CTL_DEL, fd, );
(Debug, , fd);
(fd);
(Debug, , fd);
_connections.(fd);
}
{
events = ;
events |= ((readable == ? EPOLLIN : ) |
(writeable == ? EPOLLOUT : ) | EPOLLET);
_epoller_ptr->(EPOLL_CTL_MOD, sock, events);
}
{
iter = _connections.(fd);
(iter == _connections.()) ;
;
}
{
n = _epoller_ptr->(_revs, num, timeout);
( i = ; i < n; i++) {
sock = _revs[i].data.fd;
events = _revs[i].events;
((events & EPOLLERR) | (events & EPOLLHUP)) events |= (EPOLLIN | EPOLLOUT);
((events & EPOLLIN) && (sock)) {
(_connections[sock]->_recv_cb)
_connections[sock]->_recv_cb(_connections[sock]);
}
((events & EPOLLOUT) && (sock)) {
(_connections[sock]->_send_cb)
_connections[sock]->_send_cb(_connections[sock]);
}
}
}
{
_quit = ;
(!_quit) {
();
();
}
_quit = ;
}
{
std::cout << << std::endl;
(& connection : _connections) {
std::cout << connection.first << ;
std::cout << << connection.second->() << std::endl;
}
std::cout << std::endl;
}
~() {}
:
_quit;
_port;
std::shared_ptr<Sock> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
std::unordered_map<, std::shared_ptr<Connection>> _connections;
_revs[num];
_OnMessage;
};
总结
本文详细介绍了基于 ET 模式 epoll 的 Reactor 模型 TCP 服务器的完整实现过程,涵盖了从基础框架搭建到业务协议集成的各个阶段。通过合理设计 Connection 类和 TcpServer 类,解决了循环引用和非阻塞 IO 处理等关键问题,实现了高效稳定的网络服务架构。