C++ 仿 Muduo 库:Server 服务器模块实现(上)
一、Buffer 模块
本质:缓冲区模板
功能:存储数据,取出数据
实现思想:
- 实现缓冲区得有一块内存空间,采用
vector<char>,vector 底层其实使用的是一个线性的内存空间。 - 要素:
- 默认的空间大小
- 当前的读取数据位置
- 当前的写入数据位置
- 操作:
- 写入数据:当前写入位置指向哪里,就从哪里开始写入。如果后续剩余空闲空间不够了,考虑缓冲区空闲空间是否足够(因为读位置也会向后偏移,也就是说前面也可能有空闲空间)。
- 足够:将数据移动到起始位置即可。
- 不够:扩容,从当前写位置开始扩容足够大小。
- 数据一旦写入成功,当前写位置就要往后偏移。
- 读取数据:当前读取位置指向哪里,就从哪里开始读取,前提是有数据可读。
- 可读数据大小:当前写入位置 - 当前读取位置。
- 写入数据:当前写入位置指向哪里,就从哪里开始写入。如果后续剩余空闲空间不够了,考虑缓冲区空闲空间是否足够(因为读位置也会向后偏移,也就是说前面也可能有空闲空间)。
设计如下:
class Buffer {
public:
// 1. 获取当前写位置地址
// 2. 确保可写空间足够
// 3. 获取前沿空间大小
// 4. 获取后沿空间大小
// 5. 将写位置向后移动指定长度
// 6. 获取当前读位置地址
// 7. 获取可读数据大小
// 8. 将读位置向后移动指定长度
// 9. 清理功能
private:
std::vector<char> _buffer;
uint64_t _read_idx; // 相对读偏移
uint64_t _write_idx; // 相对写偏移
};
代码如下:
class Buffer {
private:
// 注意:这里的起始地址:_buffer.data() 或者 &*_buffer.begin() 都可以
char *Begin() { return &*_buffer.begin(); }
void ReadData(void *data, uint64_t len) {
assert(len <= ReadableSize());
std::copy(GetReadPos(), GetReadPos() + len, (char*)data);
MoveReadOffset(len);
}
void WriteData(const void *data, uint64_t len) {
if (len <= 0) return;
EnsureWriteSpace(len);
const char *d = static_cast<const char *>(data);
std::copy(d, d + len, GetWritePos());
MoveWriteOffset(len);
}
void WriteBuffer(Buffer &buf) {
return WriteData(buf.GetReadPos(), buf.ReadableSize());
}
void WriteString( std::string &str) {
(str.(), str.());
}
:
( size = ) : _reader_idx(), _writer_idx() {
_buffer.(size);
}
{ () + _writer_idx; }
{ () + _reader_idx; }
{
(len <= ());
_reader_idx += len;
}
{
(len <= () + ());
_writer_idx += len;
}
{
*pos = ();
(pos == ) { ; }
(pos - () + );
}
{ _buffer.() - _writer_idx; }
{ _reader_idx; }
{ _writer_idx - _reader_idx; }
{
(() >= len) ;
(len <= () + ()) {
readable_size = ();
std::((), () + readable_size, ());
_writer_idx = readable_size;
_reader_idx = ;
} {
new_size = _buffer.() * ;
(new_size < len) { new_size *= ; }
_buffer.(new_size);
}
}
{
(data, len);
(len);
}
{
(str);
(str.());
}
{
(buf);
(buf.());
}
{
(buf, len);
(len);
}
{
(len <= ());
std::string str;
str.(len);
(&str[], len);
str;
}
{
std::string str = (len);
(len);
str;
}
{
*res = (*)std::((), , ());
res;
}
{
std::string str = ();
(str.());
str;
}
{
_buffer.();
_reader_idx = ;
_writer_idx = ;
}
:
std::vector<> _buffer;
_reader_idx;
_writer_idx;
};
测试代码如下:
int main() {
Buffer buffer;
for (int i = 0; i < 300; i++) {
std::string str = "hello world" + std::to_string(i) + "\n";
buffer.WriteStringAndPush(str);
}
while (buffer.GetReadableSize() > 0) {
std::string line = buffer.GetLineAndPop();
std::cout << "Line: " << line << std::endl;
}
return 0;
}
二、日志模块
详情可以参考相关技术文档。
#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL -1
#define LOG(level, format, ...) do { \
if (level < LOG_LEVEL) break; \
time_t t = time(nullptr); \
struct tm* tm = localtime(&t); \
char buf[64]; \
strftime(buf, sizeof(buf) - 1, "%Y-%m-%d %H:%M:%S", tm); \
printf("%s [%s:%d] " format "\n", buf, __FILE__, __LINE__, ##__VA_ARGS__); \
} while (0)
#define LOG_INFO(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define LOG_DEBUG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define LOG_ERROR(format, ...) LOG(ERR, format, ##__VA_ARGS__)
三、套接字 Socket 设计
1. 代码实现
// Socket 类
#define MAXLISTEN 1024
class Socket {
private:
bool Create() {
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (_sockfd < 0) {
LOG_ERROR("CREATE SOCKET ERROR");
return false;
}
return true;
}
bool Bind(const std::string &ip, uint16_t port) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = bind(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0) {
LOG_ERROR("BIND SOCKET ERROR");
return false;
}
return true;
}
bool Listen(int backlog = MAXLISTEN) {
int ret = listen(_sockfd, backlog);
(ret < ) {
();
;
}
;
}
{
addr;
addr.sin_family = AF_INET;
addr.sin_port = (port);
addr.sin_addr.s_addr = (ip.());
len = ( sockaddr_in);
ret = (_sockfd, ( sockaddr*)&addr, len);
(ret < ) {
();
;
}
;
}
:
() : _sockfd() {}
~() { (); }
( sockfd) : _sockfd(sockfd) {}
( Socket&) = ;
Socket& =( Socket&) = ;
{ _sockfd; }
{
newfd = (_sockfd, , );
(newfd < ) {
();
;
}
newfd;
}
{
ret = (_sockfd, buf, len, flag);
(ret <= ) {
(errno == EAGAIN || errno == EINTR) {
;
}
(, (errno));
;
}
ret;
}
{
(buf, len, MSG_DONTWAIT);
}
{
ret = (_sockfd, buf, len, flag);
(ret < ) {
(errno == EAGAIN || errno == EINTR) {
;
}
(, (errno));
;
}
ret;
}
{
(len == ) ;
(buf, len, MSG_DONTWAIT);
}
{
(_sockfd != ) {
(_sockfd);
_sockfd = ;
}
}
{
opt = ;
(_sockfd, SOL_SOCKET, SO_REUSEADDR, (*)&opt, (opt));
opt = ;
(_sockfd, SOL_SOCKET, SO_REUSEPORT, (*)&opt, (opt));
}
{
flag = (_sockfd, F_GETFL, );
(flag == ) {
();
;
}
ret = (_sockfd, F_SETFL, flag | O_NONBLOCK);
(ret < ) {
();
;
}
}
{
(!()) ;
(nonblock_flag) ();
();
(!(ip, port)) ;
(!()) ;
;
}
{
(!()) ;
(!(ip, port)) ;
;
}
:
_sockfd;
};
2. 细节处理
细节 1:处理 Recv 函数时,errno 的来源以及为啥不用 EWOULDBLOCK
errno的来源:errno是 C 标准库中定义的全局变量,用于存储系统调用或库函数失败时的错误码。当recv返回-1时,具体的错误原因通过errno传递。EAGAIN和EWOULDBLOCK的关系:在 Linux 系统中,两者的值相同(均为 11),定义在/usr/include/asm-generic/errno-base.h中。
细节 2:MSG_DONTWAIT 的概述
在 NonBlockRecv 和 NonBlockSend 函数中,使用了 MSG_DONTWAIT 标志来实现非阻塞接收。即使当前套接字本身是阻塞模式,也会让本次 recv 调用立即返回。如果此时接收缓冲区中没有数据,recv 会返回 -1,并将 errno 设置为 EAGAIN 或 EWOULDBLOCK。
细节 3:关于 ReuseAddr()
SO_REUSEADDR:安全复用,允许同一地址和端口被多个套接字绑定(常用于快速重启服务)。SO_REUSEPORT:多进程共享端口,允许多个套接字绑定到完全相同的地址和端口(需所有套接字均设置此选项),用于负载均衡。- 建议:若无需多进程/线程共享端口,仅保留
SO_REUSEADDR。
为什么默认不允许端口复用? 每个 TCP/UDP 连接由五元组唯一标识。如果多个 socket 绑定到相同的地址和端口,系统将无法判断哪个 socket 应该处理新连接。
SO_REUSEADDR:允许'安全'的端口复用
- 用途:快速重启服务,避免 "Address already in use" 错误。
- 原理:允许绑定到已被其他 socket 使用的地址,但前提是原 socket 已关闭或也设置了该选项。
SO_REUSEPORT:允许多个 socket 同时绑定到相同地址和端口
- 用途:负载均衡,高并发场景。
- 原理:多个 socket 可以绑定到相同的地址和端口,内核负责将连接请求分发给各个 socket。
细节 4:宏污染
由于最开始的时候,日志实现代码和测试代码都用了相同的局部变量 char buf。虽然从语法上看,宏中的 buf 是局部变量,不会影响外部的 buf,但在某些编译器或特定优化条件下,栈内存的布局可能会导致 buf 被意外覆盖。建议修改宏内部变量名以避免冲突。
四、Channel 类设计
目的:对描述符的监控事件管理。
功能:
- 事件管理:描述符是否可读写,对描述符的监控可读可写,解除事件监控。
- 事件触发后处理的管理:需要处理的事件(可读,可写,挂断,错误,任意),事件处理的回调函数。
成员:
epoll进行事件监控:EPOLLIN(可读),EPOLLOUT(可写),EPOLLRDHUP(连接断开),EPOLLPRI(优先数据),EPOLLERR(出错),EPOLLHUP(挂断)。
1. 代码实现
class Channel {
public:
using EventCallback = std::function<void()>;
explicit Channel(Poller* poller, int fd) : _fd(fd), _events(0), _revents(0), _poller(poller) {}
~Channel() {
if (_fd != -1) {
close(_fd);
_fd = -1;
}
}
int Fd() const { return _fd; }
uint32_t Events() const { return _events; }
bool ReadAble() const { return (_events & EPOLLIN); }
bool WriteAble() const { return (_events & EPOLLOUT); }
void SetReadCallback(const EventCallback& cb) { _read_callback = cb; }
void SetWriteCallback(const EventCallback& cb) { _write_callback = cb; }
void SetCloseCallback(const EventCallback& cb) { _close_callback = cb; }
{ _error_callback = cb; }
{ _event_callback = cb; }
{ _revents = events; }
{ _events |= EPOLLIN; (); }
{ _events |= EPOLLOUT; (); }
{ _events &= ~EPOLLIN; (); }
{ _events &= ~EPOLLOUT; (); }
{ _events = ; (); }
{ _poller->(); }
{ _poller->(); }
{
((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {
(_event_callback) _event_callback();
(_read_callback) _read_callback();
} (_revents & EPOLLOUT) {
(_event_callback) _event_callback();
(_write_callback) _write_callback();
} (_revents & EPOLLERR) {
(_error_callback) _error_callback();
} (_revents & EPOLLHUP) {
(_close_callback) _close_callback();
}
}
:
_fd;
_events;
_revents;
Poller* _poller;
EventCallback _read_callback;
EventCallback _write_callback;
EventCallback _close_callback;
EventCallback _error_callback;
EventCallback _event_callback;
};
2. 细节处理
细节 1:在 HandleEvent 函数中使用 if-else if 结构而非多个独立的 if
使用 if-else if 是为了保障资源安全、明确事件优先级,并避免因同时处理多个事件导致的未定义行为。错误和挂起事件的优先级更高,且互斥处理可避免访问已释放对象。
五、Poller 模块实现
意义:通过 epoll 实现对描述符的 IO 事件监控。
功能:
- 添加 / 修改描述符的事件监控。
- 移除描述符的事件监控。
封装思想:
- 必须拥有一个 epoll 的操作句柄。
- 拥有一个
struct epoll_event的结构数组,监控时保存所有的活跃事件。 - 使用 hash 表管理描述符与描述符对应的事件管理
Channel对象。
1. 代码实现
#define MAX_EPOLLER_EVENTS 1024
class Poller {
private:
void Update(Channel* channel, int op) {
int fd = channel->Fd();
struct epoll_event event;
event.data.fd = fd;
event.events = channel->Events();
int ret = epoll_ctl(_epfd, op, fd, &event);
if (ret < 0) {
LOG_ERROR("EPOLL_CTL ERROR");
}
return;
}
bool HasChannel(Channel* channel) {
return _channels.find(channel->Fd()) != _channels.end();
}
public:
Poller() {
_epfd = epoll_create1(EPOLL_CLOEXEC);
if (_epfd < 0) {
LOG_ERROR("EPOLL_CREATE ERROR");
abort();
}
}
Poller(const Poller&) = delete;
Poller& operator=(const Poller&) = delete;
void UpdateEvent(Channel* channel) {
bool ret = HasChannel(channel);
(!ret) {
_channels.(std::(channel->(), channel));
(channel, EPOLL_CTL_ADD);
}
(channel, EPOLL_CTL_MOD);
}
{
it = _channels.(channel->());
(it != _channels.()) {
_channels.(it);
}
(channel, EPOLL_CTL_DEL);
}
{
nfds = (_epfd, _events, MAX_EPOLLER_EVENTS, );
(nfds < ) {
(errno == EINTR) {
;
}
(, (errno));
();
}
( i = ; i < nfds; ++i) {
fd = _events[i].data.fd;
it = _channels.(fd);
(it != _channels.());
Channel* channel = it->second;
channel->(_events[i].events);
active->(channel);
}
}
:
_epfd;
_events[MAX_EPOLLER_EVENTS];
std::unordered_map<, Channel*> _channels;
};
2. 细节处理
细节 1:epoll 是水平触发(LT)还是边缘触发(ET)?区别是什么?
答案:默认是 LT,LT 在数据未处理完时会持续通知;ET 仅在状态变化时通知一次,需配合非阻塞 I/O 使用。
细节 2:Poll 方法返回的活跃事件是如何处理的?
答案:遍历 epoll_wait 返回的事件,填充到 active 列表中,并设置 Channel 的 _revents。
细节 3:Poller 是否支持多线程同时调用 Poll 方法?
答案:不支持,需通过锁或每个线程使用独立的 epoll 实例。
细节 4:epoll_wait 的超时时间为何设置为 -1?是否合理?
答案:-1 表示无限等待,适合服务器模型;但需根据业务需求调整,如设置超时处理定时任务。
六、EventLoop 模块实现
1. 关于 eventfd 函数
eventfd 是 Linux 提供的一种轻量级的进程间通信(IPC)机制,用于在进程或线程之间传递事件通知。
1.1 函数概述
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
initval: 初始化计数器的值。flags: 常见标志包括EFD_CLOEXEC,EFD_NONBLOCK,EFD_SEMAPHORE。
1.2 代码示例
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/eventfd.h>
int main() {
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0) {
perror("eventfd");
return -1;
}
uint64_t val = 1;
write(efd, &val, sizeof(val));
write(efd, &val, sizeof(val));
write(efd, &val, sizeof(val));
uint64_t res = 0;
read(efd, &res, sizeof(res));
printf("res = %lu\n", res);
close(efd);
return 0;
}
1.3 使用场景及注意事项
- 线程间同步:实现生产者 - 消费者模型或信号量机制。
- 事件通知:在多线程或多进程环境中,用于通知某些事件的发生。
- 与
epoll配合:将eventfd文件描述符加入epoll,用于事件驱动的程序中。
2. EventLoop 模块概述
EventLoop:进行事件监控,以及事件处理的模块(关键点:这个模块和线程是一一对应的)。
事件监控流程:
- 在线程中对描述符进行事件监控。
- 有描述符就绪则对描述符进行事件处理。
- 所有的就绪事件处理完了,这时候再去将任务队列中的所有任务执行。
代码实现:
class EventLoop {
public:
using Functor = std::function<void()>;
EventLoop() : _thread_id(std::this_thread::get_id()),
_event_fd(CreateEventFd()),
_event_channel(new Channel(this, _event_fd)) {
_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));
_event_channel->EnableRead();
}
bool IsInLoop() { return _thread_id == std::this_thread::get_id(); }
void UpdateEvent(Channel* channel) {
assert(IsInLoop());
_poller.UpdateEvent(channel);
}
void RemoveEvent(Channel* channel) {
assert(IsInLoop());
_poller.RemoveEvent(channel);
}
void RunAllTask() {
std::vector<Functor> tasks;
{
std::lock_guard<std::mutex> lock(_mutex);
tasks.swap(_tasks);
}
for (auto& t : tasks) {
t();
}
return;
}
{
efd = (, EFD_NONBLOCK | EFD_CLOEXEC);
(efd < ) {
();
();
}
efd;
}
{
data = ;
ret = (_event_fd, &data, (data));
(ret < ) {
(errno == EAGAIN || errno == EINTR) {
;
}
();
();
}
;
}
{
data = ;
ret = (_event_fd, &data, (data));
(ret < ) {
(errno == EAGAIN || errno == EINTR) {
;
}
();
();
}
;
}
{
std::vector<Channel*> actives;
_poller.(&actives);
(& channel : actives) {
channel->();
}
();
}
{
{
;
_tasks.(cb);
}
();
}
{
(()) {
();
} {
(cb);
}
}
:
std::thread::id _thread_id;
_event_fd;
std::unique_ptr<Channel> _event_channel;
Poller _poller;
std::vector<Functor> _tasks;
std::mutex _mutex;
};
3. 与 TimeWheel 模块整合
将定时器任务与事件循环绑定,确保定时器回调在 EventLoop 线程中执行。
TimerWheel 类关键函数:
class TimerWheel {
public:
void AddTimer(uint64_t id, uint32_t delay, const TaskFunc& cb) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}
void RefreshTimer(uint64_t id) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void CancelTimer(uint64_t id) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}
private:
void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc& cb) {
PtrTask pt(new TimerTask(id, delay, cb));
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
_timers[id] = WeakTask(pt);
}
void TimerRefreshInLoop(uint64_t id) {
it = _timers.(id);
(it == _timers.()) ;
PtrTask pt = it->second.();
(!pt) ;
remaining = pt->();
pos = (_tick + remaining) % _capacity;
_wheel[pos].(pt);
}
{
it = _timers.(id);
(it != _timers.()) _timers.(it);
}
{
_tick = (_tick + ) % _capacity;
& tasks = _wheel[_tick];
(& task : tasks) {
(!task->_canceled) task->_cb();
task->_release();
}
tasks.();
}
{
times = ();
( i = ; i < times; ++i) {
();
}
}
{
timerfd = (CLOCK_MONOTONIC, );
(timerfd < ) {
();
();
}
itime;
itime.it_value.tv_sec = ;
itime.it_value.tv_nsec = ;
itime.it_interval.tv_sec = ;
itime.it_interval.tv_nsec = ;
(timerfd, , &itime, );
timerfd;
}
{
times = ;
ret = (_timerfd, ×, (times));
(ret < ) {
();
();
}
times;
}
:
WeakTask = std::weak_ptr<TimerTask>;
PtrTask = std::shared_ptr<TimerTask>;
_capacity;
_tick;
_timerfd;
std::unique_ptr<Channel> _timer_channel;
EventLoop* _loop;
std::vector<std::vector<PtrTask>> _wheel;
std::unordered_map<, WeakTask> _timers;
};
4. 细节分析
细节 1:定时器任务中异步执行回调
如果在非事件循环线程中调用了 RemoveEvent 或 Channel::Remove(),而 EventLoop 的所有操作都要求必须在事件循环线程中执行(通过 assert(IsInLoop()) 检查),会导致程序崩溃。因此,定时器回调必须在 EventLoop 线程中执行。
细节 2:服务器端关闭再启动的文件描述符(fd)不变
Linux 系统中,文件描述符的分配遵循 '最小可用原则' 。当一个 fd 被关闭后,它会被标记为'可重用',下次分配新文件或 socket 时会优先使用这些被释放的 fd。
细节 3:Channel 类中的 Remove 和 Update 方法为何调用 EventLoop 的接口?
职责分离:Channel 仅负责事件注册,Poller 负责底层 I/O 事件监控。通过 EventLoop 统一管理事件增删改,确保事件状态一致性。
细节 4:如何避免定时器任务的重复添加?
HasTimer检查:在添加定时任务前调用HasTimer(id)避免重复。- 刷新替代新增:若定时任务已存在,调用
RefreshTimer(id)延迟销毁时间。 - 线程安全:所有定时器操作通过
EventLoop串行化执行。


