【C++仿Muduo库#3】Server 服务器模块实现上
📃个人主页:island1314
⛺️ 欢迎关注:👍点赞 👂🏽留言 😍收藏 💞 💞 💞
- 生活总是不会一帆风顺,前进的道路也不会永远一马平川,如何面对挫折影响人生走向 – 《人民日报》
🔥 目录
一、Buffer 模块
本质:缓冲区模板
功能:存储数据,取出数据
实现思想:
- 实现缓冲区得有一块内存空间,采用
vector<char>vector 底层其实使用的是一个线性的内存空间 - 要素:
- 默认的空间大小
- 当前的读取数据位置
- 当前的写入数据位置
- 操作:
- 写入数据:当前写入位置指向哪里,就从哪里开始写入,如果后续剩余空闲空间不够了,这个就考虑征途缓冲区空闲空间是否足够(因为 读位置也会向后偏移,也就是说前面也可能有空闲空间)
- 足够:将数据移动到起始位置即可
- 不够:扩容,从当前写位置开始扩容足够大小
- 数据一旦写入成功,当前写位置,就要往后偏移
- 读取数据:当前读取位置指向哪里,就从哪里开始读取,前提是有数据可读
- 可读数据大小:当前写入位置 - 当前读取位置
- 写入数据:当前写入位置指向哪里,就从哪里开始写入,如果后续剩余空闲空间不够了,这个就考虑征途缓冲区空闲空间是否足够(因为 读位置也会向后偏移,也就是说前面也可能有空闲空间)
设计如下:
classBuffer{public:// 1. 获取当前写位置地址// 2. 确保可写空间足够// 3. 获取前沿空间大小 // 4. 获取后沿空间大小 // 5. 将写位置向后移动指定长度 // 6. 获取当前读位置地址 // 7. 获取可读数据大小// 8. 将读位置向后移动指定长度// 9. 清理功能 private: std::vector<char> _buffer;// 位置, 是一个相对偏移量, 而不是绝对地址uint64_t _read_idx;// 相对读偏移uint64_t _write_idx;// 相对写偏移};代码如下:
class Buffer{ private: // 注意: 这里的起始地址:_buffer.data() 或者 &*_buffer.begin() 都可以 char *Begin(){return &*_buffer.begin();} // 读写数据 void ReadData(void *data, uint64_t len) { assert(len <= ReadableSize()); std::copy(GetReadPos(), GetReadPos() + len, (char*)data); // MoveReadOffset(len); } void WriteData(const void *data, uint64_t len) { // 1. 确保可写空间足够 2. 拷贝数据 if(len <= 0) return ; EnsureWriteSpace(len); const char *d = static_cast<const char *>(data); std::copy(d, d + len, GetWritePos()); // 把 data 复制到 缓冲区 // MoveWriteOffset(len); } void WriteBuffer(Buffer &buf) { return WriteData(buf.GetReadPos(), buf.ReadableSize()); } void WriteString(const std::string &str) { return WriteData(str.c_str(), str.size()); } public: Buffer(uint64_t size = 1024): _reader_idx(0), _writer_idx(0){ _buffer.resize(size); } // 获取当前读写位置地址 char *GetWritePos() {return Begin() + _writer_idx;} char *GetReadPos(){return Begin() + _reader_idx;} // 将读写位置向后移动指定长度 void MoveReadOffset(uint64_t len){ assert(len <= ReadableSize()); _reader_idx += len; } void MoveWriteOffset(uint64_t len){ assert(len <= BufferHeadSize() + BufferTailSize()); _writer_idx += len; } // 获取一行数据 std::string GetLine(){ char *pos = FindCRLF(); if (pos == nullptr) { return ""; } return ReadAsString(pos - ReadPos() + 1); } // 获取缓冲区末尾空闲空间大小 -- 写偏移之后的空闲空间 uint64_t BufferTailSize() {return _buffer.size() - _writer_idx;} // 获取缓冲区起始空闲空间大小 -- 读偏移之前的空闲空间 uint64_t BufferHeadSize(){return _reader_idx;} // 获取可读数据大小 uint64_t ReadableSize() {return _writer_idx - _reader_idx;} // 确保可写空间足够 (移动 / 扩容) void EnsureWriteSpace(uint64_t len) { // 1. 末尾空闲空间大小足够, 直接返回 if(BufferTailSize() >= len) return; // 2. 先移动读偏移 if (len <= BufferHeadSize() + BufferTailSize()) { // 3. 空闲空间足够, 数据移动到起始位置 uint64_t readable_size = ReadableSize(); // 保存当前数据大小 // std::memmove(_buffer.data(), _buffer.data() + _reader_idx, ReadableSize()); std::copy(GetReadPos(), GetReadPos() + readable_size, Begin()); // 将可读数据保存到起始位置 // 更新读写偏移 _writer_idx = readable_size; _reader_idx = 0; } else { // 4. 扩容 uint64_t new_size = _buffer.size() * 2; // 避免持续扩容 while (new_size < len) { new_size *= 2; } _buffer.resize(new_size); } } void WriteAndPush(const void *data, uint64_t len) { WriteData(data, len); MoveWriteOffset(len); } void WriteStringAndPush(const std::string &str) { WriteString(str); MoveWriteOffset(str.size()); } void WriteBufferAndPush(Buffer &buf) { WriteBuffer(buf); MoveWriteOffset(buf.ReadableSize()); } void ReadAndPop(void *buf, uint64_t len) { ReadData(buf, len); MoveReadOffset(len); } std::string ReadAsString(uint64_t len){ assert(len <= ReadableSize()); std::string str; str.resize(len); ReadData(&str[0], len); return str; } std::string ReadAsStringAndPop(uint64_t len){ std::string str = ReadAsString(len); MoveReadOffset(len); return str; } char *FindCRLF(){ char *res = (char*)std::memchr(GetReadPos(), '\n', ReadableSize()); return res; } std::string GetLineAndPop(){ std::string str = GetLine(); MoveReadOffset(str.size()); return str; } // 9. 清空缓冲区 void clear(){ _buffer.clear(); _reader_idx = 0; _writer_idx = 0; } private: std::vector<char> _buffer; // 使用 vector 进行内存空间管理 // 位置, 是一个相对偏移量, 而不是绝对地址 uint64_t _reader_idx; // 相对读偏移 uint64_t _writer_idx; // 相对写偏移 }; 测试代码如下
intmain(){ Buffer buffer;for(int i =0; i <300; i++){ std::string str ="hello world"+ std::to_string(i)+"\n"; buffer.WriteStringAndPush(str);}while(buffer.GetReadableSize()>0){ std::string line = buffer.GetLineAndPop(); std::cout <<"Line: "<< line << std::endl;}// // std::string str = "hello world"; // // buffer.WriteStringAndPush(str);// std::cout << "Buffer size: " << buffer.GetReadableSize() << std::endl;// std::cout << "Buffer content: " << buffer.ReadAsStringAndPop(buffer.GetReadableSize()) << std::endl;// buffer.WriteStringAndPush("hello world\n");// std::string tmp = buffer.ReadAsStringAndPop(buffer.GetReadableSize());// std::cout << "tmp: " << tmp << std::endl;// std::cout << "Buffer size: " << buffer.GetReadableSize() << std::endl;return0;}二、日志模块
详情可以参考我的这篇文章:
#defineINF0#defineDBG1#defineERR2#defineLOG_LEVEL-1#defineLOG(level, format,...)do{\if(level < LOG_LEVEL)break;\time_t t =time(nullptr);\structtm*tm =localtime(&t);\char buf[64];\strftime(buf,sizeof(buf)-1,"%Y-%m-%d %H:%M:%S", tm);\printf("%s [%s:%d] "format "\n", buf,__FILE__,__LINE__,##__VA_ARGS__);\}while(0)#defineLOG_INFO(format,...)LOG(INF, format,##__VA_ARGS__)#defineLOG_DEBUG(format,...)LOG(DBG, format,##__VA_ARGS__)#defineLOG_ERROR(format,...)LOG(ERR, format,##__VA_ARGS__)三、套接字 Socket 设计
这个可以参考我之前的这篇文章:
1. 代码实现
// Socket 类#defineMAXLISTEN1024classSocket{private:// 1.创建套接字boolCreate(){ _sockfd =socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if(_sockfd <0){LOG_ERROR("CREATE SOCKET ERROR");returnfalse;}returntrue;}// 2.绑定地址信息boolBind(const std::string &ip,uint16_t port){structsockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port =htons(port); addr.sin_addr.s_addr =inet_addr(ip.c_str()); socklen_t len =sizeof(structsockaddr_in);int ret =bind(_sockfd,(structsockaddr*)&addr, len);if(ret <0){LOG_ERROR("BIND SOCKET ERROR");returnfalse;}returntrue;}// 3.监听boolListen(int backlog = MAXLISTEN){int ret =listen(_sockfd, backlog);if(ret <0){LOG_ERROR("LISTEN SOCKET ERROR");returnfalse;}returntrue;}// 4.向服务器发起连接boolConnect(const std::string &ip,uint16_t port){structsockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port =htons(port); addr.sin_addr.s_addr =inet_addr(ip.c_str()); socklen_t len =sizeof(structsockaddr_in);int ret =connect(_sockfd,(structsockaddr*)&addr, len);if(ret <0){LOG_ERROR("CONNECT SOCKET ERROR");returnfalse;}returntrue;}public:Socket():_sockfd(-1){}~Socket(){Close();}Socket(int sockfd):_sockfd(sockfd){}// 避免拷贝问题Socket(const Socket&)=delete; Socket&operator=(const Socket&)=delete;intFd()const{return _sockfd;}// 5.获取新连接intAccept(){int newfd =accept(_sockfd,nullptr,nullptr);if(newfd <0){LOG_ERROR("ACCEPT SOCKET ERROR");return-1;}return newfd;// 返回新连接的套接字}// 6.接收数据 ssize_t Recv(void*buf, size_t len,int flag =0){ ssize_t ret =recv(_sockfd, buf, len, flag);// < 0 出错// = 0 连接断开// > 0 接收成功if(ret <=0){// EAGAIN | EWOULDBLOCK: 当前 socket 的非阻塞缓冲区没有数据了// EINTR: 当前 socket 的阻塞等待被信号中断// ECONNRESET: 连接重置// ENOTCONN: 套接字未连接// ETIMEDOUT: 接收超时if(errno == EAGAIN || errno == EINTR){return0;// 表示: 这次发送没有发送成功, 需重试}LOG_ERROR("Recv SOCKET %s",strerror(errno));return-1;// 其他错误}return ret;// 实际接收长度} ssize_t NonBlockRecv(void*buf, size_t len){returnRecv(buf, len, MSG_DONTWAIT);// MSG_DONTWAIT 表示当前接收为非阻塞。}// 7.发送数据 ssize_t Send(constvoid* buf, size_t len,int flag =0){ ssize_t ret =send(_sockfd, buf, len, flag);if(ret <0){if(errno == EAGAIN || errno == EINTR){return0;// 表示: 这次发送没有发送成功, 需重试}LOG_ERROR("SEND SOCKET %s",strerror(errno));return-1;// 其他错误 }return ret;} ssize_t NonBlockSend(constvoid* buf, size_t len){if(len ==0)return0;returnSend(buf, len, MSG_DONTWAIT);// MSG_DONTWAIT 表示当前发送为非阻塞}// 8.关闭套接字voidClose(){if(_sockfd !=-1){close(_sockfd); _sockfd =-1;}}// 9.设置套接字选项 -- 开启地址端口重用voidReuseAddr(){int opt =1;// SO_REUSEADDR: 允许重用本地地址和端口// SO_REUSEPORT: 允许重用本地端口setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR,(void*)&opt,sizeof(opt)); opt =1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT,(void*)&opt,sizeof(opt));}// 10.设置套接字阻塞属性 -- 设置为非阻塞voidNonBlock(){int flag =fcntl(_sockfd, F_GETFL,0);if(flag ==-1){LOG_ERROR("GET SOCKET FLAG ERROR");return;}int ret =fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);if(ret <0){LOG_ERROR("SET SOCKET NONBLOCK ERROR");return;}}// 9. 创建一个服务器连接boolCreateServer(uint16_t port,const std::string &ip ="0.0.0.0",bool nonblock_flag =false){if(!Create())returnfalse;if(nonblock_flag)NonBlock();// 设置非阻塞ReuseAddr();// 设置地址端口重用if(!Bind(ip, port))returnfalse;if(!Listen())returnfalse;returntrue;}// 10. 创建一个客户端连接boolCreateClient(uint16_t port,const std::string &ip){// 1. 创建套接字 2. 连接服务器if(!Create())returnfalse;if(!Connect(ip, port))returnfalse;returntrue;}private:int _sockfd;};2. 代码检测
server.cpp
intmain(){ Socket lst_sock; lst_sock.CreateServer(8080);while(1){int newfd = lst_sock.Accept();if(newfd <0){continue;} Socket cli_sock(newfd);char buf[1024]={0};int ret = cli_sock.Recv(buf,1023);if(ret <0){ cli_sock.Close();continue;} cli_sock.Send(buf, ret); cli_sock.Close();} lst_sock.Close();return0;}client.cpp
intmain(){ Socket cli_sock; cli_sock.CreateClient(8080,"127.0.0.1"); cli_sock.Send("Hello IsLand",strlen("Hello IsLand"));char buf[1024]={0}; cli_sock.Recv(buf,1023);LOG_DEBUG("recv data: %s", buf); cli_sock.Close();return0;}结果如下:
lighthouse@VM-8-10-ubuntu:Test1$ ./client 2025-05-02 10:48:41 [tcp_cli.cc:11] recv data: Hello IsLand 3. 细节处理
细节1:处理 Recv 函数时, errno 的来源以及 为啥不用 EWOULDBLOCK
① errno 的来源
errno是 C标准库 中定义的全局变量(线程安全环境下由__errno_location()实现),用于存储系统调用或库函数失败时的错误码。- 注意:
recv返回-1时,错误码需通过errno获取,而非直接从ret的值推断
当 recv 返回 -1 时,表示发生错误,具体的错误原因会通过 errno 变量传递(如 EAGAIN, EINTR 等)
if(ret <=0){if(errno == EAGAIN || errno == EINTR){return0;// 表示非致命错误,继续尝试接收}}② EAGAIN 和 EWOULDBLOCK 的关系
在 Linux 系统 中,EAGAIN 和 EWOULDBLOCK 的值是相同的(均为 11),定义在 /usr/include/asm-generic/errno-base.h 中:
#defineEAGAIN11/* Try again */#defineEWOULDBLOCKEAGAIN /* Operation would block */细节2:MSG_DONWAIT 的概述
在 NonBlockRecv 和 NonBlockSend函数中,使用了 MSG_DONTWAIT 标志来实现 非阻塞接收 (Non-blocking Receive),这是网络编程中一种常见的异步通信机制。以下是对其工作原理和用途的详细解析:
MSG_DONTWAIT是recv系统调用的一个标志位,用于 临时启用非阻塞模式 。- 它的作用是:即使当前套接字(socket)本身是阻塞模式(默认行为),也会让本次
recv调用立即返回,而不是等待数据到达。 - 如果此时接收缓冲区中没有数据,
recv会返回-1,并将errno设置为EAGAIN或EWOULDBLOCK(两者等价),表示“暂时无数据,稍后再试”
细节3:关于 ReuseAddr()
SO_REUSEADDR:安全复用,允许同一地址和端口被多个套接字绑定(常用于快速重启服务)SO_REUSEPORT:多进程共享端口,允许多个套接字绑定到完全相同的地址和端口(需所有套接字均设置此选项),用于负载均衡- 建议 :若无需多进程/线程共享端口,仅保留
SO_REUSEADDR
那么之前 我们说过如下:
- 在 TCP/IP 协议中,一个 IP 地址和端口组合(即 socket 地址)默认情况下只能被一个 socket 绑定 ,这是为了防止多个进程同时监听同一个端口,造成数据混乱。但在某些特殊场景下,我们确实需要“复用”端口,这就引入了
SO_REUSEADDR和SO_REUSEPORT选项。
📌 为什么默认不允许端口复用?
- 每个 TCP/UDP 连接由五元组唯一标识:{协议, 源IP, 源端口, 目的IP, 目的端口}
- 其中,服务器监听的 socket 地址(即
bind()的地址)决定了它能接收哪些连接。如果多个 socket 绑定到相同的地址和端口,系统将无法判断哪个 socket 应该处理新连接,从而导致冲突。
因此,默认情况下,系统禁止两个 socket 绑定到相同的地址和端口(之前在这篇【Linux网络#2】: Socket 编程 就提过一个端口号只能被一个进程占用,但是一个进程能够绑定多个端口)
① SO_REUSEADDR:允许“安全”的端口复用
✅ 用途
- 快速重启服务 :当服务意外崩溃或正常关闭后,TCP 连接可能仍处于
TIME_WAIT状态(通常持续 2MSL,约 60 秒),此时端口仍被占用。 - 避免“Address already in use”错误 :通过设置
SO_REUSEADDR,可以让服务在TIME_WAIT状态期间重新绑定到端口。
🧠 原理
SO_REUSEADDR允许绑定到已被其他 socket 使用的地址,但前提是:- 该 socket 已关闭(即不再有活跃连接)。
- 或者该 socket 也设置了
SO_REUSEADDR。
- 内核会检查当前是否有活跃连接,如果没有,则允许复用。
② SO_REUSEPORT:允许多个 socket 同时绑定到相同地址和端口
✅ 用途
- 负载均衡 :多个进程或线程可以同时监听相同的地址和端口,内核负责将连接均匀分配给它们。
- 高并发场景 :适用于需要并行处理大量连接的服务(如 Web 服务器)。
🧠 原理
- 多个 socket 可以绑定到相同的地址和端口,但所有 socket 必须都设置
SO_REUSEPORT。 - 内核会使用一种机制(如哈希算法)将连接请求分发给各个 socket。
⚠️ 注意事项
SO_REUSEPORT是 Linux 3.9+ 引入的特性,旧版本系统不支持。- 不同 socket 之间的负载均衡策略依赖内核实现,不同系统行为可能不同。
🧩 为什么 SO_REUSEADDR 能绕过“一个端口只能被一个进程占用”?
虽然 TCP/IP 协议规定一个 socket 地址只能被一个 socket 绑定,但 SO_REUSEADDR 是一个“例外规则”,它允许在特定条件下复用地址。
✅ 条件如下:
- 原 socket 已关闭 :即没有活跃连接。
- 新 socket 设置了
SO_REUSEADDR。 - 原 socket 也设置了
SO_REUSEADDR(可选)。
在这种情况下,内核认为复用是“安全”的,不会导致连接混乱,因此允许绑定。
🧠 举个例子:服务重启时的 TIME_WAIT 问题
这个情况之前在 【Linux网络#11】: 传输层协议 TCP 四次挥手的内容下也提过
假设你写了一个 TCP 服务器,监听在 0.0.0.0:8080。当你关闭服务器后立即重启,可能会遇到如下错误:
bind: Address already in use 这是因为:
- 关闭连接后,TCP 连接进入
TIME_WAIT状态(持续 2MSL,约 60 秒),以确保所有残留数据包都被丢弃。 - 在此期间,端口仍被占用,系统不允许新 socket 绑定。
解决办法 :在 bind() 之前设置 SO_REUSEADDR,这样即使端口仍处于 TIME_WAIT 状态,也可以绑定成功。
🧾小结
| 特性 | SO_REUSEADDR | SO_REUSEPORT |
|---|---|---|
| 是否允许多个 socket | 否(默认) | 是 |
| 是否需要所有 socket 设置 | 否 | 是 |
| 用途 | 快速重启服务 | 多进程/线程负载均衡 |
| 是否破坏唯一性 | 否(仅在安全条件下复用) | 是(允许多个 socket 监听相同地址) |
| 是否跨平台 | 高度支持 | Linux 3.9+ |
| 是否影响连接分发 | 否 | 是(内核分发连接) |
✅ 推荐使用方式
- 普通服务重启 :使用
SO_REUSEADDR,避免TIME_WAIT导致的绑定失败。 - 高并发负载均衡 :使用
SO_REUSEPORT,多个进程/线程共享端口,提升性能。 - 避免滥用 :除非明确需要,否则不要同时设置两者,防止行为不可预测。
虽然 TCP/IP 协议规定一个端口只能被一个 socket 绑定,但SO_REUSEADDR和SO_REUSEPORT提供了“例外”机制,分别用于 安全复用 和多进程共享端口
细节4:宏污染
由于最开始的时候,我的日志实现代码 和 测试代码都用了相同的 局部变量 char buf
- 日志定义了一个局部变量
char buf[64],与server.cpp和client.cpp中的char buf[1024]同名但作用域不同 - 虽然从语法上看,宏中的
buf是局部变量,不会影响外部的buf,但在某些编译器或特定优化条件下,栈内存的布局可能会导致buf被意外覆盖 。
// LOG_DEBUG 宏定义char buf[64];// 与 server.cpp 和 client.cpp 中的 char buf[1024] 同名当客户端调用LOG_DEBUG("recv data: %s", buf)时,宏展开后会生成一个char buf[64],用于存储时间戳字符串。
如果编译器在栈上分配内存时,将宏内的buf和外部的buf紧邻存放,printf的格式化输出可能会溢出到外部的buf中 ,导致接收到的数据被覆盖为时间戳字符串。
这样就导致我输出了如下的数据结果:
四、Channel 类设计
目的:对描述符的监控事件管理
功能:
- 事件管理:描述符是否可读写,对描述符的监控可读可写,解除 事件 监控
- 事件触发后处理的管理
- 需要处理的事件:可读,可写,挂断,错误,任意
- 事件处理的回调函数
成员:
epoll进行事件监控EPoollIN:可读EPoollOUT:可写EPoollRDHUP:连接断开EPoollPRI:优先数据EPoollERR:出错EPoollHUP:挂断
1. 代码实现
classChannel{public:using EventCallback = std::function<void()>;// 注意: 这里不能放在 private 中, 否则会报错explicitChannel(int fd):_fd(fd),_events(0),_revents(0){}// 显式调用 ~Channel(){if(_fd !=-1){close(_fd); _fd =-1;// 避免重复关闭}}intFd()const{return _fd;}uint32_tEvents()const{return _events;}// 判断当前事件是否可读写boolReadAble()const{return(_events & EPOLLIN);}boolWriteAble()const{return(_events & EPOLLOUT);}// 设置回调函数voidSetReadCallback(const EventCallback& cb){ _read_callback = cb;}voidSetWriteCallback(const EventCallback& cb){_write_callback = cb;}voidSetCloseCallback(const EventCallback& cb){_close_callback = cb;}voidSetErrorCallback(const EventCallback& cb){_error_callback = cb;}voidSetEventCallback(const EventCallback& cb){_event_callback = cb;}/* 监控事件开关 -- 进行事件监控连接后, 描述符就绪事件, 设置实际就绪事件*/voidSetREvents(uint32_t events){ _revents = events;}/* 开启事件监控 */voidEnableRead(){ _events |= EPOLLIN;}voidEnableWrite(){ _events |= EPOLLOUT;}/* 关闭事件监控 */voidDisableRead(){ _events &=~EPOLLIN;}voidDisableWrite(){ _events &=~EPOLLOUT;}voidDisableAll(){_events =0;}// 关闭所有事件/* 后面调用 Poller 和 EventLoop 接口来移除事件监控 */voidRemove(){}// 移除事件voidUpdate(){}// 更新事件voidHandleEvent(){// 处理事件, 判断连接触发了什么事件 // 实际项目中, 连接断开并不会直接先断开, 还是看到是否有数据可读, 先读完数据或者发送出错再断开if((_revents & EPOLLIN)||(_revents & EPOLLRDHUP)||(_revents & EPOLLPRI)){// 不管任何事件, 都会调用的回调函数if(_event_callback)_event_callback();if(_read_callback)_read_callback();}elseif(_revents & EPOLLOUT){// 不管任何事件, 都会调用的回调函数if(_event_callback)_event_callback();if(_write_callback)_write_callback();}elseif(_revents & EPOLLERR){if(_error_callback)_error_callback();}elseif(_revents & EPOLLHUP){if(_close_callback)_close_callback();}}private:int _fd;// 事件对应的文件描述符uint32_t _events;// 当前需要监控的事件uint32_t _revents;// 当前连接触发的事件 EventCallback _read_callback;// 读事件回调 EventCallback _write_callback;// 写事件回调 EventCallback _close_callback;// 关闭事件回调 EventCallback _error_callback;// 错误事件回调 EventCallback _event_callback;// 任意事件回调};2. 细节处理
细节1:在 HandleEvent 函数中使用 if-else if 结构而非多个独立的 if
使用 if-else if 是为了保障资源安全、明确事件优先级 ,并避免因同时处理多个事件导致的未定义行为
① 事件优先级和互斥性
- 错误和挂起事件的优先级更高
EPOLLERR(错误)和EPOLLHUP(挂起)通常表示连接出现严重问题(如对端关闭、网络中断),需要立即处理
如果这些事件与读写事件同时发生,应优先处理错误/挂起事件,避免在无效连接上继续执行读写操作 - 互斥处理:某些事件的处理逻辑可能互斥。例如:
- 写事件(
EPOLLOUT)处理可能触发连接关闭,导致后续的错误/挂起事件处理访问已释放的对象 - 错误/挂起事件处理通常直接终止连接,无需再处理读写
- 写事件(
② 资源安全:避免访问已释放对象
- 写事件处理可能释放连接
在注释中明确指出:“有可能会释放连接的操作事件,一次只处理一个”
若写事件的回调(如_write_callback)中关闭了连接(如close(fd)或删除Channel对象),后续的错误/挂起事件处理若继续执行,可能导致访问已释放资源 (如空指针、无效文件描述符) - 使用
else if阻断后续逻辑
通过else if结构,确保一旦处理了写事件,错误/挂起事件将不再被处理,从而避免潜在的资源管理问题
③ 逻辑清晰和可维护
- 明确事件处理顺序:
if-else if结构清晰表达了事件处理的优先级顺序 ,使代码更易理解和维护 - 避免冗余判断:若多个事件同时发生(如
EPOLLOUT | EPOLLERR),使用else if可避免重复判断和执行无关逻辑,提升效率。
那么什么时候可以使用独立的 if 呢 ???
若多个事件的处理逻辑相互独立且不会导致对象销毁或状态变化 ,可以使用独立的 if 语句,例如
if(_revents & EPOLLIN){/* 读事件 */}if(_revents & EPOLLOUT){/* 写事件 */}此时,即使同时触发读和写事件,两者都会被处理,且不会互相干扰
五、Poller 模块实现
意义:通过 epoll 实现对描述符的 IO 事件监控
功能:
- 添加 / 修改描述符的事件监控(不存在则添加,存在则修改)
- 移除描述符的事件监控
封装思想
- 必须拥有一个 epoll 的操作句柄
- 拥有一个
struct epoll_event的结构数组,监控时保存所有的活跃事件 - 使用 hash 表管理描述符与描述符对应的事件管理
Channel对象 - 逻辑流程
- 对描述符进行监控,通过
Channel才能知道描述符需要监控的事件 - 当描述符就绪了,通过描述符在 hash 表中找到对应的 Channel(得到了 Channel 才能什么事件如何处理)
- 当描述符就绪了,返回就绪描述符对应的 Channel
- 对描述符进行监控,通过
1. 代码实现
#defineMAX_EPOLLEREVENTS1024classPoller{private:// 对 epoll 的之间操作voidUpdate(Channel* channel,int op){// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);int fd = channel->Fd();structepoll_event event; event.data.fd = fd;// 事件对应的文件描述符 event.events = channel->Events();// 需要监控的事件int ret =epoll_ctl(_epfd, op, fd,&event);if(ret <0){LOG_ERROR("EPOLL_CTL ERROR");}return;}// 判断一个 Channel 是否已经添加了 事件监控boolHasChannel(Channel* channel){return _channels.find(channel->Fd())!= _channels.end();}public:Poller(){// _epfd = epoll_create(MAX_EPOLLEREVENTS); // 这个已经过时 _epfd =epoll_create1(EPOLL_CLOEXEC);// 创建 epoll 文件描述符if(_epfd <0){LOG_ERROR("EPOLL_CREATE ERROR");abort();// 退出程序}}Poller(const Poller&)=delete; Poller&operator=(const Poller&)=delete;// 添加 / 修改 监控事件voidUpdateEvent(Channel* channel){bool ret =HasChannel(channel);if(!ret){// 不存在, 添加事件 _channels.insert(std::make_pair(channel->Fd(), channel));// 添加到映射表returnUpdate(channel, EPOLL_CTL_ADD);}returnUpdate(channel, EPOLL_CTL_MOD);// 已经存在, 修改事件}voidRemoveEvent(Channel* channel){auto it = _channels.find(channel->Fd());if(it != _channels.end()){ _channels.erase(it);// 从映射表中删除}Update(channel, EPOLL_CTL_DEL);// 删除事件}// 开始监控, 返回活跃连接voidPoll(std::vector<Channel*>*active){// int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);int nfds =epoll_wait(_epfd, _events, MAX_EPOLLEREVENTS,-1);if(nfds <0){if(errno == EINTR){return;// 被信号中断}LOG_ERROR("EPOLL_WAIT ERROR: %s\n",strerror(errno));abort();// 退出程序}// 遍历活跃连接for(int i =0; i < nfds;++i){int fd = _events[i].data.fd;auto it = _channels.find(fd);assert(it != _channels.end());// 断言: 事件一定存在 Channel* channel = it->second; channel->SetREvents(_events[i].events);// 设置当前事件 active->push_back(channel);// 添加到活跃连接列表}}private:int _epfd;// epoll 文件描述符structepoll_event _events[MAX_EPOLLEREVENTS];// epoll 事件数组 std::unordered_map<int, Channel*> _channels;// fd -> Channel 映射表};2. 细节处理
细节1:epoll 是水平触发(LT)还是边缘触发(ET)?区别是什么?
- 答案 :默认是 LT,LT 在数据未处理完时会持续通知;ET 仅在状态变化时通知一次,需配合非阻塞 I/O 使用
细节2:Poll 方法返回的活跃事件是如何处理的?
- 答案 :遍历
epoll_wait返回的事件,填充到active列表中,并设置 Channel 的_revents
细节3:Poller 是否支持多线程同时调用 Poll 方法?
- 答案 :不支持,需通过锁或每个线程使用独立的 epoll 实例(如 Reactor 模式)
细节3:epoll_wait 的超时时间为何设置为 -1?是否合理?
- 答案 :-1 表示无限等待,适合服务器模型;但需根据业务需求调整,如设置超时处理定时任务
3. 与 Channel 的整合测试
由于我们有些东西,需要 Poller 来结合测试,所以对 Channel 模块也需要做一些修改,如下:
classPoller;classChannel{public:using EventCallback = std::function<void()>;// 注意: 这里不能放在 private 中, 否则会报错explicitChannel(Poller*poller,int fd):_fd(fd),_events(0),_revents(0),_poller(poller){}// 显式调用 ~Channel(){if(_fd !=-1){close(_fd); _fd =-1;// 避免重复关闭}}intFd()const{return _fd;}uint32_tEvents()const{return _events;}// 判断当前事件是否可读写boolReadAble()const{return(_events & EPOLLIN);}boolWriteAble()const{return(_events & EPOLLOUT);}// 设置回调函数voidSetReadCallback(const EventCallback& cb){_read_callback = cb;}voidSetWriteCallback(const EventCallback& cb){_write_callback = cb;}voidSetCloseCallback(const EventCallback& cb){_close_callback = cb;}voidSetErrorCallback(const EventCallback& cb){_error_callback = cb;}voidSetEventCallback(const EventCallback& cb){_event_callback = cb;}/* 监控事件开关 -- 进行事件监控连接后, 描述符就绪事件, 设置实际就绪事件*/voidSetREvents(uint32_t events){ _revents = events;}/* 开启事件监控 */voidEnableRead(){ _events |= EPOLLIN;Update();}voidEnableWrite(){ _events |= EPOLLOUT;Update();}/* 关闭事件监控 */voidDisableRead(){ _events &=~EPOLLIN;Update();}voidDisableWrite(){ _events &=~EPOLLOUT;Update();}voidDisableAll(){_events =0;Update();}// 关闭所有事件/* 后面调用 EventLoop 接口来移除事件监控 */voidRemove();// 移除事件voidUpdate();// 更新事件voidHandleEvent(){// 处理事件, 判断连接触发了什么事件 // 实际项目中, 连接断开并不会直接先断开, 还是看到是否有数据可读, 先读完数据或者发送出错再断开if((_revents & EPOLLIN)||(_revents & EPOLLRDHUP)||(_revents & EPOLLPRI)){// 不管任何事件, 都会调用的回调函数if(_event_callback)_event_callback();if(_read_callback)_read_callback();}elseif(_revents & EPOLLOUT){// 不管任何事件, 都会调用的回调函数if(_event_callback)_event_callback();if(_write_callback)_write_callback();}elseif(_revents & EPOLLERR){if(_error_callback)_error_callback();}elseif(_revents & EPOLLHUP){if(_close_callback)_close_callback();}}private:int _fd;// 事件对应的文件描述符uint32_t _events;// 当前需要监控的事件uint32_t _revents;// 当前连接触发的事件 Poller* _poller;// 事件循环 EventCallback _read_callback;// 读事件回调 EventCallback _write_callback;// 写事件回调 EventCallback _close_callback;// 关闭事件回调 EventCallback _error_callback;// 错误事件回调 EventCallback _event_callback;// 任意事件回调};voidChannel::Remove(){return _poller->RemoveEvent(this);}// 移除事件voidChannel::Update(){return _poller->UpdateEvent(this);}// 更新事件测试代码如下:
server.cpp
#include"../../source/server.hpp"voidHandleClose(Channel *channel){ std::cout <<"HandleClose: "<< channel->Fd()<< std::endl; channel->Remove();// 移除事件delete channel;// 释放内存}voidHandleRead(Channel *channel){int fd = channel->Fd();char buf[1024]={0}; ssize_t ret =recv(fd, buf,1023,0);if(ret <=0){returnHandleClose(channel);// 关闭事件} std::cout << buf << std::endl; channel->EnableWrite();// 启动可写事件}voidHandleWrite(Channel *channel){int fd = channel->Fd();constchar*data ="I miss You"; ssize_t ret =send(fd, data,strlen(data),0);if(ret <0){returnHandleClose(channel);// 关闭事件} channel->DisableWrite();// 关闭可写事件}voidHandleError(Channel *channel){returnHandleClose(channel);}voidHandlEvent(Channel *channel){ std::cout <<"有了一个事件"<< std::endl;}voidAcceptor(Poller* poller, Channel *lst_channel){int fd = lst_channel->Fd();int newfd =accept(fd,nullptr,nullptr);if(newfd <0)return; Channel *channel =newChannel(poller, newfd); channel->SetReadCallback(std::bind(HandleRead, channel));// 为通信套接字设置可读事件回调函数 channel->SetWriteCallback(std::bind(HandleWrite, channel));// 可写事件的回调函数 channel->SetCloseCallback(std::bind(HandleClose, channel));// 关闭事件的回调函数 channel->SetErrorCallback(std::bind(HandleError, channel));// 错误事件的回调函数 channel->SetEventCallback(std::bind(HandlEvent, channel));// 任意事件的回调函数 channel->EnableRead();// 监听读事件 }intmain(){ Poller poller; Socket lst_sock; lst_sock.CreateServer(8080);// 为监听套接字, 创建一个 Channel 进行事件的管理及处理 Channel channel(&poller, lst_sock.Fd()); channel.SetReadCallback(std::bind(Acceptor,&poller,&channel));// 设置监听套接字的可读事件回调函数 channel.EnableRead();while(1){ std::vector<Channel*> actives; poller.Poll(&actives);// 开始监控, 返回活跃连接for(auto& a: actives){ a->HandleEvent();// 处理事件}} lst_sock.Close();return0;}client.cpp
#include"../../source/server.hpp"intmain(){ Socket cli_sock; cli_sock.CreateClient(8080,"127.0.0.1");while(1){ std::string str ="Hello IsLand"; cli_sock.Send(str.c_str(), str.size());char buf[1024]={0}; cli_sock.Recv(buf,1023);LOG_DEBUG("%s", buf);sleep(1);}return0;}结果如下:
lighthouse@VM-8-10-ubuntu:Test2$ ./client 2025-05-04 21:54:10 [tcp_cli.cc:13] I miss You 2025-05-04 21:54:11 [tcp_cli.cc:13] I miss You ^C lighthouse@VM-8-10-ubuntu:Test2$ ./server 有了一个事件 Hello IsLand 有了一个事件 有了一个事件 Hello IsLand 有了一个事件 有了一个事件 HandleClose: 5六、EventLoop 模块实现
1. 关于 evenfd 函数
eventfd 是 Linux 提供的一种轻量级的进程间通信(IPC)机制,用于在进程或线程之间传递事件通知。它通过一个文件描述符来实现计数器的功能,支持读写操作,适合用于事件通知或信号量的实现
1.1 函数概述
#include<sys/eventfd.h>inteventfd(unsignedint initval,int flags);initval: 初始化计数器的值(uint64_t类型)。这是eventfd的初始计数值。flags:
用于设置文件描述符的行为,常见的标志包括:EFD_CLOEXEC: 设置close-on-exec标志,表示在调用exec系列函数时自动关闭该文件描述符,禁止进程复制EFD_NONBLOCK: 设置非阻塞模式,读写操作不会阻塞。EFD_SEMAPHORE: 启用信号量语义(每次读取时计数器减 1,而不是清零)
返回值
- 成功时返回一个文件描述符(
efd),可以通过read和write操作与eventfd交互(注意:read&write 进行 IO 的时候数据只能是 一个 8 字节数据 ) - 失败时返回
-1,并设置errno以指示错误原因。
功能
- 写入计数器:
- 使用
write向eventfd写入一个uint64_t值,计数器会累加该值。 - 如果计数器的值超过
UINT64_MAX,会返回错误EOVERFLOW。
- 使用
- 读取计数器:
- 使用
read从eventfd读取一个uint64_t值:- 如果未设置
EFD_SEMAPHORE,读取操作会返回计数器的当前值,并将计数器清零。 - 如果设置了
EFD_SEMAPHORE,每次读取会返回1,并将计数器减 1。
- 如果未设置
- 如果计数器为 0 且未设置
EFD_NONBLOCK,read会阻塞;如果设置了EFD_NONBLOCK,则返回-1并设置errno为EAGAIN。
- 使用
1.2 代码示例
#include<stdio.h>#include<stdint.h>#include<unistd.h>#include<fcntl.h>#include<sys/eventfd.h>intmain(){int efd =eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if(efd <0){perror("eventfd");return-1;}uint64_t val =1;write(efd,&val,sizeof(val));write(efd,&val,sizeof(val));write(efd,&val,sizeof(val));uint64_t res =0;read(efd,&res,sizeof(res));printf("res = %lu\n", res);close(efd);return0;}// 结果如下: lighthouse@VM-8-10-ubuntu:eventfd$ ./ev res =3分析如下:
int efd =eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);- 创建一个
eventfd,初始计数器值为0。 - 设置了
EFD_CLOEXEC和EFD_NONBLOCK:EFD_CLOEXEC: 在调用exec系列函数时自动关闭文件描述符。EFD_NONBLOCK: 使读写操作非阻塞。
uint64_t val =1;write(efd,&val,sizeof(val));write(efd,&val,sizeof(val));write(efd,&val,sizeof(val));- 向
eventfd写入1三次,计数器的值累加为3。
uint64_t res =0;read(efd,&res,sizeof(res));printf("res = %lu\n", res);- 从
eventfd读取计数器的值,res被设置为3,同时计数器清零。
输出结果为:
res = 3 close(efd);- 关闭
eventfd文件描述符,释放资源。
1.3 使用场景及注意事项
常见用途
- 线程间同步:
- 使用
eventfd实现生产者-消费者模型或信号量机制。
- 使用
- 事件通知:
- 在多线程或多进程环境中,用于通知某些事件的发生。
- 与
epoll配合:- 将
eventfd文件描述符加入epoll,用于事件驱动的程序中。
- 将
注意事项
- 计数器溢出:
- 如果计数器的值超过
UINT64_MAX,write会返回错误EOVERFLOW
- 如果计数器的值超过
- 非阻塞模式:
- 如果设置了
EFD_NONBLOCK,在计数器为 0 时调用read会返回-1并设置errno为EAGAIN
- 如果设置了
- 信号量模式:
- 如果设置了
EFD_SEMAPHORE,每次读取会返回1,并将计数器减 1,而不是清零
- 如果设置了
2. Eventloop 模块概述
Eventloop:进行事件监控,以及事件处理的模块(关键点:这个模块和线程是一一对应的)
- 监控了一个连接,而这个连接一旦就绪,就要进行事件处理。但是如果这个描述符,在多个线程中都触发了事件,进行处理,就会存在线程安全问题
- 因此我们需要将一个连接的事件监控,以及连接事件处理,以及其他操作都放在同一个线程中进行
如何保证一个连接的所有操作都在 eventloop 对应的线程中
- 解决方案:给
eventloop模块中,添加一个任务队列,对连接的所有操作,都进行一次封装,将对连接的操作并不直接执行,而是当作任务添加到任务队列中
eventloop 处理流程:
- 在线程中对描述符进行事件监控
- 有描述符就绪则对描述符进行事件处理(如何保证处理回调函数中的操作都在线程中)
- 所有的就绪事件处理完了,这时候再去将任务队列中的所有任务–执行
事件监控
- 事件监控:使用 Poller 模块,有事件就绪则进行事件处理
- 执行任务队列中的任务:一个线程安全的任务队列
注意:由于有可能因为等待描述符 IO 事件就绪,导致执行流流程阻塞,这时候任务队列中的任务得不到指向
- 因此需要需要有一个事件通知的东西,能够唤醒事件监控的阻塞
代码实现
classEventLoop{public:voidRunAllTask(){// 在加锁期间取出所有任务, 给锁限定作用域 std::vector<Functor> tasks;{ std::lock_guard<std::mutex>lock(_mutex);// 加锁 tasks.swap(_tasks);// 交换任务池, 取出所有任务}for(auto&t: tasks){t();// 执行任务}return;}staticintCreateEventfd(){int efd =eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if(efd <0){LOG_ERROR("CREATE EVENTFD ERROR");abort();// 退出程序}return efd;}voidReadEventFd(){uint64_t data =0; ssize_t ret =read(_event_fd,&data,sizeof(data));if(ret <0){if(errno == EAGAIN || errno == EINTR){return;// 没有数据可读}LOG_ERROR("READ EVENTFD ERROR");abort();// 退出程序}return;}// 唤醒事件循环voidWakeupEventFd(){uint64_t data =1; ssize_t ret =write(_event_fd,&data,sizeof(data));if(ret <0){if(errno == EAGAIN || errno == EINTR){return;// 没有数据可读}LOG_ERROR("WRITE EVENTFD ERROR");abort();// 退出程序}return;}public:using Functor = std::function<void()>;EventLoop():_thread_id(std::this_thread::get_id()),// 获取当前线程 ID_event_fd(CreateEventfd()),// 创建 eventfd 唤醒 IO 事件监控_event_channel(newChannel(this, _event_fd))// 创建事件循环的 Channel{//给eventfd添加可读事件回调函数,读取eventfd事件通知次数 _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd,this)); _event_channel->EnableRead();// 设置可读事件}// 判断当前线程是否是 EventLoop 中对应线程boolIsInLoop(){return _thread_id == std::this_thread::get_id();}// 修改/添加 描述符的事件监控voidUpdateEvent(Channel* channel){assert(IsInLoop());// 断言: 当前线程是事件循环线程 _poller.UpdateEvent(channel);// 修改/添加事件监控}voidRemoveEvent(Channel* channel){assert(IsInLoop()); _poller.RemoveEvent(channel);// 移除事件监控}// 事件监控->就绪事件处理->执行任务voidStart(){// 1, 事件监控 std::vector<Channel*> actives;// 活跃连接 _poller.Poll(&actives);// 进行事件监控// 2, 事件处理for(auto&channel: actives){ channel->HandleEvent();// 处理事件}// 3, 执行任务RunAllTask();// 执行任务}// 压入任务队列voidQueueInLoop(const Functor& cb){{ std::lock_guard<std::mutex>lock(_mutex);// 加锁 _tasks.emplace_back(cb);// 压入任务}// 唤醒事件循环 -- 由于没有事件就绪 导致的 epoll 阻塞// 其实就是给 eventfd 写入一个数据, 使得 epoll 事件就绪WakeupEventFd();}// 判断要执行任务是否处于当前线程, 如果是则执行, 不是则压入队列voidRunInLoop(const Functor& cb){if(IsInLoop()){cb();}else{QueueInLoop(cb);}}private: std::thread::id _thread_id;// 事件循环线程 IDint _event_fd;// eventfd 唤醒 IO 事件监控可能导致的阻塞// 注意: 这里的 Channel用智能指针进行管理, Poller 使用的对象 std::unique_ptr<Channel> _event_channel;// 事件循环的 Channel Poller _poller;// 进行所有描述符的事件监控 std::vector<Functor> _tasks;// 任务池 std::mutex _mutex;// 互斥锁};// 注意: 这里的 Channel 类也要做一些改变, 类似于 Poller 模块的处理改变voidChannel::Remove(){return _loop->RemoveEvent(this);}// 移除事件voidChannel::Update(){return _loop->UpdateEvent(this);}// 更新事件3. 与 TimeWheel 模块整合
由于我们需要用到我们之前所说的 TimeWheel 模块,并且对其做一些改变
- 将定时器任务与事件循环绑定 :确保定时器回调在
EventLoop线程中执行,避免线程安全问题 - 利用事件驱动机制:通过
timerfd触发定时任务,与epoll事件监控无缝结合 - 支持任务的添加、刷新、取消和周期性执行
① TimerWheel 与 EventLoop 的绑定
- 绑定关系 :
TimerWheel依赖于一个EventLoop实例,所有定时任务的执行都通过该EventLoop的线程完成 - 事件驱动 :通过
timerfd的可读事件触发定时任务处理(OnTime)
TimerWheel 构造函数 :
TimerWheel(EventLoop *loop):_capacity(60),_tick(0),_loop(loop),_timerfd(CreateTimerfd()),_timer_channel(newChannel(_loop, _timerfd)){ _wheel.resize(_capacity); _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime,this)); _timer_channel->EnableRead();// 启动读事件监控}② 定时器任务的执行流程
- 触发机制 :当
timerfd被触发时,OnTime会被调用,读取超时次数并依次处理每个 tick 的任务。 - 线程安全 :
OnTime是Channel的读事件回调,由EventLoop的线程调用,确保所有定时任务在事件循环线程中执行。 - 任务执行 :遍历当前 tick 的所有任务,执行回调函数。
- 资源管理 :通过
_release删除TimerWheel中保存的任务映射,避免内存泄漏。
RunTimerTask 函数 :
voidRunTimerTask(){ _tick =(_tick +1)% _capacity;auto& tasks = _wheel[_tick];for(auto& task : tasks){if(!task->_canceled){ task->_cb();// 执行回调} task->_release();// 释放资源} tasks.clear();// 清空当前 tick 的任务}OnTime 函数
voidRunTimerTask(){ _tick =(_tick +1)% _capacity;auto& tasks = _wheel[_tick];for(auto& task : tasks){if(!task->_canceled){ task->_cb();// 执行回调} task->_release();// 释放资源} tasks.clear();// 清空当前 tick 的任务}③ 定时器任务的添加与刷新
- 任务封装 :使用
shared_ptr管理任务生命周期,weak_ptr避免循环引用。 - 位置计算 :根据当前
tick和延迟时间delay计算任务在轮子中的位置。 - 重新插入 :将任务移动到新的 tick 位置,实现延迟效果。
刷新任务 :
voidTimerRefreshInLoop(uint64_t id){auto it = _timers.find(id);if(it == _timers.end())return; PtrTask pt = it->second.lock();if(!pt)return;int remaining = pt->DelayTime();int pos =(_tick + remaining)% _capacity; _wheel[pos].push_back(pt);// 重新插入任务}添加任务 :
voidTimerAddInLoop(uint64_t id,uint32_t delay,const TaskFunc& cb){ PtrTask pt(newTimerTask(id, delay, cb)); pt->SetRelease(std::bind(&TimerWheel::RemoveTimer,this, id));int pos =(_tick + delay)% _capacity; _wheel[pos].push_back(pt);// 将任务添加到轮子中 _timers[id]=WeakTask(pt);// 保存任务映射}④ 线程安全保证
- 任务队列机制 :
QueueInLoop方法 :所有对Channel或定时器的操作都通过EventLoop::QueueInLoop提交到任务队列。RunInLoop方法 :确保操作在事件循环线程中执行。WakeupEventFd:通过写入eventfd唤醒阻塞的epoll_wait,及时处理任务队列。
- 定时器回调的线程一致性 :
TimerTask 析构函数 :
~TimerTask(){if(!_canceled)_cb();// 直接执行回调_release();}关键点 :_cb() 的执行必须在 EventLoop 线程中,通过 TimerWheel::OnTime 触发,无需额外线程同步
代码整合示例
classEventLoop{public:// 添加定时器任务voidAddTimer(uint64_t id,uint32_t delay,const TaskFunc &cb){return _timer_wheel.AddTimer(id, delay, cb);}// 刷新定时器voidRefreshTimer(uint64_t id){return _timer_wheel.RefreshTimer(id);}// 取消定时器voidCancelTimer(uint64_t id){return _timer_wheel.CancelTimer(id);}private: TimerWheel _timer_wheel;// 定时器轮};TimerWheel 类关键函数
classTimerWheel{public:// 添加定时任务(供 EventLoop 调用)voidAddTimer(uint64_t id,uint32_t delay,const TaskFunc& cb){ _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop,this, id, delay, cb));}// 刷新定时任务voidRefreshTimer(uint64_t id){ _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop,this, id));}// 取消定时任务voidCancelTimer(uint64_t id){ _loop->RunInLoop(std::bind(&TimerWheel::TimerCanceInLoop,this, id));}private:// 实际添加任务的逻辑voidTimerAddInLoop(uint64_t id,uint32_t delay,const TaskFunc& cb){ PtrTask pt(newTimerTask(id, delay, cb)); pt->SetRelease(std::bind(&TimerWheel::RemoveTimer,this, id));int pos =(_tick + delay)% _capacity; _wheel[pos].push_back(pt); _timers[id]=WeakTask(pt);}// 实际刷新任务的逻辑voidTimerRefreshInLoop(uint64_t id){auto it = _timers.find(id);if(it == _timers.end())return; PtrTask pt = it->second.lock();if(!pt)return;int remaining = pt->DelayTime();int pos =(_tick + remaining)% _capacity; _wheel[pos].push_back(pt);}// 实际取消任务的逻辑voidTimerCanceInLoop(uint64_t id){auto it = _timers.find(id);if(it == _timers.end())return; PtrTask pt = it->second.lock();if(pt) pt->Cancel();}// 移除任务voidRemoveTimer(uint64_t id){auto it = _timers.find(id);if(it != _timers.end()) _timers.erase(it);}// 执行定时任务voidRunTimerTask(){ _tick =(_tick +1)% _capacity;auto& tasks = _wheel[_tick];for(auto& task : tasks){if(!task->_canceled) task->_cb();// 执行回调 task->_release();// 释放资源} tasks.clear();// 清空当前 tick 的任务}// 定时器事件回调voidOnTime(){int times =ReadTimerfd();for(int i =0; i < times;++i){RunTimerTask();// 执行定时任务}}// 创建 timerfdstaticintCreateTimerfd(){int timerfd =timerfd_create(CLOCK_MONOTONIC,0);if(timerfd <0){LOG_ERROR("Create timerfd error");abort();}structitimerspec itime; itime.it_value.tv_sec =1; itime.it_value.tv_nsec =0; itime.it_interval.tv_sec =1; itime.it_interval.tv_nsec =0;timerfd_settime(timerfd,0,&itime,nullptr);return timerfd;}// 读取 timerfdintReadTimerfd(){uint64_t times =0; ssize_t ret =read(_timerfd,×,sizeof(times));if(ret <0){LOG_ERROR("READ TIMERFD ERROR");abort();}return times;}private:using WeakTask = std::weak_ptr<TimerTask>;using PtrTask = std::shared_ptr<TimerTask>;constint _capacity;int _tick;int _timerfd; std::unique_ptr<Channel> _timer_channel; EventLoop* _loop; std::vector<std::vector<PtrTask>> _wheel;// 定时器轮 std::unordered_map<uint64_t, WeakTask> _timers;// ID 映射};4. 代码测试
server.cpp
#include"../../source/server.hpp"voidHandleClose(Channel *channel){LOG_DEBUG("close fd: %d", channel->Fd()); channel->Remove();// 移除事件delete channel;// 释放内存}voidHandleRead(Channel *channel){int fd = channel->Fd();char buf[1024]={0}; ssize_t ret =recv(fd, buf,1023,0);if(ret <=0){returnHandleClose(channel);// 关闭事件}LOG_DEBUG("Read: %s", buf); channel->EnableWrite();// 启动可写事件}voidHandleWrite(Channel *channel){int fd = channel->Fd();constchar*data ="I miss You"; ssize_t ret =send(fd, data,strlen(data),0);if(ret <0){returnHandleClose(channel);// 关闭事件} channel->DisableWrite();// 关闭可写事件}voidHandleError(Channel *channel){returnHandleClose(channel);}voidHandlEvent(EventLoop* loop, Channel *channel,uint64_t timerid){ loop->RefreshTimer(timerid);// 刷新定时器}voidAcceptor(EventLoop* loop, Channel *lst_channel){int fd = lst_channel->Fd();int newfd =accept(fd,nullptr,nullptr);if(newfd <0)return;uint64_t timerid =rand()%1000; Channel *channel =newChannel(loop, newfd); channel->SetReadCallback(std::bind(HandleRead, channel));// 为通信套接字设置可读事件回调函数 channel->SetWriteCallback(std::bind(HandleWrite, channel));// 可写事件的回调函数 channel->SetCloseCallback(std::bind(HandleClose, channel));// 关闭事件的回调函数 channel->SetErrorCallback(std::bind(HandleError, channel));// 错误事件的回调函数 channel->SetEventCallback(std::bind(HandlEvent, loop, channel, timerid));// 任意事件的回调函数// 非活跃连接的超时释放操作 -- 5s 后关闭// 注意: 定时销毁任务必须在启动读事件之前, 因为读事件会启动可写事件, 但这个时候还没有任务 loop->AddTimer(timerid,5, std::bind(HandleClose, channel)); channel->EnableRead();// 监听读事件 }intmain(){srand(time(nullptr));// 随机数种子 EventLoop loop; Socket lst_sock; lst_sock.CreateServer(8080);// 为监听套接字, 创建一个 Channel 进行事件的管理及处理 Channel channel(&loop, lst_sock.Fd()); channel.SetReadCallback(std::bind(Acceptor,&loop,&channel));// 设置监听套接字的可读事件回调函数 channel.EnableRead();while(1){ loop.Start();// 事件循环} lst_sock.Close();return0;}client.cpp
#include"../../source/server.hpp"intmain(){ Socket cli_sock; cli_sock.CreateClient(8080,"127.0.0.1");for(int i =0; i <3;++i){ std::string str ="Hello IsLand"; cli_sock.Send(str.c_str(), str.size());char buf[1024]={0}; cli_sock.Recv(buf,1023);LOG_DEBUG("%s", buf);sleep(1);}while(1)sleep(1);return0;}结果如下:
lighthouse@VM-8-10-ubuntu:Test4$ ./client 2025-05-0522:53:49[tcp_cli.cc:13] I miss You 2025-05-0522:53:50[tcp_cli.cc:13] I miss You 2025-05-0522:53:51[tcp_cli.cc:13] I miss You ^C lighthouse@VM-8-10-ubuntu:Test4$ ./client 2025-05-0522:54:00[tcp_cli.cc:13] I miss You 2025-05-0522:54:01[tcp_cli.cc:13] I miss You 2025-05-0522:54:02[tcp_cli.cc:13] I miss You ^C lighthouse@VM-8-10-ubuntu:Test4$ ./server 2025-05-0522:53:49[tcp_srv.cc:16] Read: Hello IsLand 2025-05-0522:53:50[tcp_srv.cc:16] Read: Hello IsLand 2025-05-0522:53:51[tcp_srv.cc:16] Read: Hello IsLand 2025-05-0522:53:55[tcp_srv.cc:4] close fd:72025-05-0522:54:00[tcp_srv.cc:16] Read: Hello IsLand 2025-05-0522:54:01[tcp_srv.cc:16] Read: Hello IsLand 2025-05-0522:54:02[tcp_srv.cc:16] Read: Hello IsLand 2025-05-0522:54:04[tcp_srv.cc:4] close fd:7^C 5. 细节分析
细节1:定时器任务中异步执行回调
由于我之前在实现 TimeWheel 代码是这样写的,如下:
~TimerTask(){if(!_canceled){ std::thread(_cb).detach();// ❌ 异步执行回调}_release();}这个写法会导致定时器回调(如 HandleClose)在子线程中执行 ,而不是在 EventLoop 所属的线程中执行,然后就出现了如下的问题:
Assertion `IsInLoop()' failed.Aborted(core dumped)根本原因 是:在非事件循环线程中调用了 RemoveEvent 或 Channel::Remove() ,而 EventLoop 的所有操作都要求必须在事件循环线程中执行(通过 assert(IsInLoop()) 检查)
分析
- HandleClose 函数内部调用了:
channel->Remove(); // 会调用 EventLoop::RemoveEvent - 此时
RemoveEvent中有断言:assert(IsInLoop()); // 检查当前线程是否是事件循环线程 - 由于回调在子线程中执行,断言失败,程序崩溃
细节2:服务器端关闭再启动的文件描述符(fd)不变
上面我们演示的时候,可以发现,当我们服务器端关闭再启动之后 fd 并没有发生改变,如下:
2025-05-05 22:53:55 [tcp_srv.cc:4] close fd: 72025-05-05 22:54:04 [tcp_srv.cc:4] close fd: 7Linux 系统中,文件描述符的分配遵循 “最小可用原则” ,即:
- 总是分配当前进程中最小的未被占用的整数 fd
- 当一个 fd 被关闭后,它会被标记为“可重用”,下次分配新文件或 socket 时会优先使用这些被释放的 fd
分析
- 客户端连接流程
- 每次运行
client,都会创建一个新的 socket,系统返回一个可用的 fd。 - 由于服务器端在连接关闭时 主动关闭了 socket 并释放了 fd ,因此下一次客户端连接时,系统会优先使用刚刚释放的 fd(例如
7)。
- 服务器端处理连接的方式
newfd是系统分配的文件描述符- 如果前一个连接的
newfd刚好是7,并且已经被关闭(close(7)),那么下一个新连接就会再次分配7
在 Acceptor 函数中,每当有新连接到来时:
int newfd =accept(fd,nullptr,nullptr); Channel *channel =newChannel(loop, newfd);- Channel 的析构行为
- 这意味着每次连接关闭时,
Channel对象被销毁时会调用close(fd),从而释放该 fd - 释放后,系统可以再次分配该 fd 给新的连接
你定义的 Channel 类析构函数会 主动关闭 fd :
~Channel(){if(_fd !=-1){close(_fd); _fd =-1;}}补充(关于这种做法的意义)
- 无需担心 fd 复用问题 :只要每次连接关闭时正确调用
close(fd),系统会安全地回收和复用 fd - 避免 fd 泄漏 :确保所有连接关闭时都正确删除
Channel对象,防止 fd 被占用不释放
细节3:Channel 类中的 Remove 和 Update 方法为何调用 EventLoop 的接口?
问题本质:模块职责分离
回答要点:
- 职责分离 :
Channel仅负责事件注册,Poller负责底层 I/O 事件监控。 - 统一管理 :通过
EventLoop统一管理事件增删改,确保事件状态一致性。
示例代码 :
voidChannel::Remove(){return _loop->RemoveEvent(this);}voidChannel::Update(){return _loop->UpdateEvent(this);}细节4:如何避免定时器任务的重复添加?
问题本质:资源泄漏与逻辑错误
回答要点:
HasTimer检查 :在添加定时任务前调用HasTimer(id)避免重复。- 刷新替代新增 :若定时任务已存在,调用
RefreshTimer(id)延迟销毁时间。 - 线程安全 :所有定时器操作通过
EventLoop串行化执行。
