跳到主要内容C++ 异步 IO 库完整实现:从内核特性到业务封装案例 | 极客日志C++
C++ 异步 IO 库完整实现:从内核特性到业务封装案例
基于 Linux io_uring 的 C++ 异步 IO 库实现。内容涵盖传统 IO 模型对比、io_uring 核心设计(SQ/CQ 队列)、底层封装 AsyncIOUring、高层文件接口 AsyncFileIO 及日志系统 AsyncLogger。包含批量提交优化、零拷贝技术、线程安全设计及性能测试案例,适合希望深入系统编程与高性能 IO 的开发者参考。
猫巷少女4 浏览 本文介绍一个基于 io_uring 的生产级 C++ 异步 IO 库实现,涵盖内核特性、业务封装及性能优化案例。
一、io_uring:异步 IO
1.1 传统 IO 模型的困境
在深入 io_uring 之前,我们先回顾一下传统 IO 模型的问题。
**同步阻塞 IO(Blocking IO)**是最简单直观的模型。当调用 read() 或 write() 时,线程会一直阻塞,直到数据准备就绪或写入完成。在单线程程序中,这意味着整个程序停滞;在多线程程序中,虽然可以通过创建更多线程来提高并发度,但每个线程的上下文切换开销(通常 1-5 微秒)和内存开销(每个线程的栈空间至少几 MB)会迅速积累,当线程数达到数千时,系统开销将变得不可接受。
**同步非阻塞 IO(Non-blocking IO)+ 多路复用(epoll/select)**是 Linux 服务器开发的经典方案。通过 监听多个文件描述符的就绪事件,单个线程可以同时处理数千个连接。然而,这个模型对于文件 IO 并不理想,因为:
epoll_wait()
- 文件描述符在
epoll 中总是就绪的(无法真正监听磁盘 IO 完成)
- 实际的
read()/write() 调用仍然会阻塞线程,等待数据从磁盘加载到页缓存
**POSIX 异步 IO(aio)**试图解决这个问题,但其实现存在诸多缺陷:
- 不同 Linux 版本的行为不一致,可靠性差
- 只支持
O_DIRECT 模式(绕过页缓存),无法利用内核的缓存优化
- 性能并不理想,在某些场景下甚至不如同步 IO
- 接口复杂,难以使用
1.2 io_uring 的核心设计
io_uring 采用了全新的设计理念,其核心是两个无锁环形队列:
- 提交队列(Submission Queue, SQ):用户态向内核提交 IO 请求
- 完成队列(Completion Queue, CQ):内核向用户态返回 IO 完成事件
这两个队列通过 mmap 映射到用户态和内核态的共享内存中,实现了真正的零拷贝。用户程序只需要在共享内存中填写 SQE(Submission Queue Entry),然后通知内核即可;内核完成 IO 后,将 CQE(Completion Queue Entry)写入共享内存,用户程序直接读取结果。整个过程避免了数据在用户态和内核态之间的来回拷贝。
SQ 轮询模式(SQPOLL):内核创建一个专门的轮询线程,不断检查 SQ 中是否有新的 IO 请求。用户程序提交请求后甚至不需要执行系统调用来通知内核,内核会自动发现并处理。这将系统调用的开销降到了接近于零。
IO 轮询模式(IOPOLL):对于支持轮询的高性能设备(如 NVMe SSD),内核可以通过轮询设备寄存器来获取 IO 完成状态,而不是等待中断。这进一步降低了延迟,在某些场景下可以将 IO 延迟从几十微秒降到几微秒。
二、项目功能
2.1 项目包含的核心功能
第一层:AsyncIOUring - io_uring 底层封装
- 初始化和管理 io_uring 实例(支持 SQPOLL 和 IOPOLL 模式)
- 提交异步读、写、同步(fsync)等操作
- 独立的完成处理线程,自动回调用户函数
- 独立的提交线程,实现批量提交优化(减少系统调用)
- 完善的 IO 统计信息(请求数、字节数、延迟等)
- 线程安全的请求管理和状态跟踪
第二层:AsyncFileIO - 高层文件 IO 接口
- 自动管理文件描述符的打开和关闭
- 支持多种文件打开模式(只读、只写、读写、追加)
- 提供异步操作(回调风格)和同步操作(future 风格)两种接口
- 支持批量 IO 操作(一次提交多个读写请求)
- 内置 Direct IO 和顺序读取优化
- 自动维护文件偏移量
- 内部缓冲区,批量写入减少 IO 次数
- 自动日志文件轮转(按大小)
- 支持多级日志(DEBUG/INFO/WARNING/ERROR)
- 定期刷新线程,确保日志不丢失
- 完全线程安全,支持多线程高并发写入
2.2 可以学习到的核心知识
- 理解异步操作的本质:提交 - 回调模式
- 掌握如何设计异步 API(回调风格 vs future 风格 vs 协程风格)
- 学会处理异步操作中的错误传播和异常安全
- 理解异步 IO 中的资源生命周期管理(缓冲区何时可以释放)
- io_uring 的初始化、配置和清理流程
- SQE 和 CQE 的结构和使用方法
- 批量提交优化:何时提交、提交多少
- 完成事件的处理:轮询 vs 阻塞等待
- SQPOLL 和 IOPOLL 模式的适用场景
- 零拷贝技术:理解
mmap 共享内存,避免数据拷贝
- 批量操作:通过减少系统调用次数来提升性能
- 预读优化:使用
posix_fadvise 提示内核进行顺序读取优化
- Direct IO:何时绕过页缓存(大文件顺序 IO)
- 队列深度调优:队列深度对性能的影响
- 线程安全的数据结构:使用
std::atomic 和 std::mutex 保护共享状态
- 生产者 - 消费者模型:提交线程生产请求,完成线程消费结果
- 条件变量的正确使用:避免 busy-waiting,高效等待异步操作完成
- 锁粒度优化:减小临界区范围,提高并发度
- Linux 文件 IO 的各种标志(
O_DIRECT、O_APPEND、O_SYNC 等)
- 文件描述符的管理和 RAII 封装
fsync 与数据持久性保证
- 时间戳、性能统计、监控系统的实现
三、核心组件
3.1 AsyncIOUring:io_uring 的完整封装
位于 src/AsyncIOUring.cpp 和 include/AsyncIOUring.h。让我们逐步剖析其设计。
数据结构设计
class AsyncIOUring {
private:
struct io_uring ring_;
unsigned queue_depth_;
bool use_sqpoll_;
bool use_iopoll_;
bool initialized_;
std::atomic<bool> running_;
std::thread completion_thread_;
std::thread submit_thread_;
mutable std::mutex request_mutex_;
std::unordered_map<uint64_t, std::unique_ptr<IORequest>> pending_requests_;
std::atomic<uint64_t> next_req_id_;
IOStatsInternal stats_;
std::mutex submit_mutex_;
std::condition_variable completion_cv_;
std::atomic<int> pending_submissions_{0};
static constexpr int BATCH_SUBMIT_SIZE = 32;
};
completion_thread_:专门负责从 CQ 中获取完成事件并执行回调
submit_thread_:定期批量提交 SQ 中的请求
这样做的好处是用户线程调用 submitRead/submitWrite 时只需要准备 SQE,立即返回,不会被 IO 阻塞。实际的提交和完成处理都在后台异步进行。
2. 请求跟踪机制:每个 IO 请求都有唯一的 req_id
std::unordered_map<uint64_t, std::unique_ptr<IORequest>> pending_requests_;
当用户提交请求时,系统生成一个 ID 并保存请求信息(包括回调函数);当 IO 完成时,通过 ID 找回请求并执行回调。这是异步编程的典型模式。
int pending = pending_submissions_.fetch_add(1, std::memory_order_relaxed) + 1;
if (pending >= BATCH_SUBMIT_SIZE) {
int ret = io_uring_submit(&ring_);
pending_submissions_.store(0, std::memory_order_relaxed);
}
这段代码位于 submitRequest 函数中。只有当累积了 32 个请求(BATCH_SUBMIT_SIZE)时,才调用 io_uring_submit 向内核提交。这大大减少了系统调用的次数。同时,submit_thread_ 会每隔 50 微秒检查一次,如果有挂起的请求就提交,避免请求被无限期延迟。
初始化流程
bool AsyncIOUring::initialize() {
if (initialized_) return true;
unsigned flags = 0;
if (use_sqpoll_) flags |= IORING_SETUP_SQPOLL;
if (use_iopoll_) flags |= IORING_SETUP_IOPOLL;
int ret = io_uring_queue_init(queue_depth_, &ring_, flags);
if (ret < 0) {
std::cerr << "io_uring 初始化失败:" << strerror(-ret) << std::endl;
return false;
}
initialized_ = true;
running_ = true;
completion_thread_ = std::thread(&AsyncIOUring::completionThread, this);
submit_thread_ = std::thread(&AsyncIOUring::submitThread, this);
return true;
}
io_uring_queue_init 是 liburing 库提供的初始化函数,它会:
- 在内核中创建 io_uring 实例
- 分配 SQ 和 CQ 所需的内存
- 将这些内存
mmap 映射到用户态
- 设置必要的控制结构
queue_depth 参数决定了可以同时挂起的 IO 操作数量。对于小型应用,128 就足够;对于数据库等高并发场景,建议设置为 512 或 1024。
提交 IO 请求的核心流程
uint64_t AsyncIOUring::submitRead(int fd, void* buffer, size_t length, off_t offset, std::function<void(ssize_t, int)> callback) {
auto request = std::make_unique<IORequest>();
request->op_type = IOOpType::READ;
request->fd = fd;
request->buffer = buffer;
request->length = length;
request->offset = offset;
request->callback = std::move(callback);
request->submit_time = std::chrono::steady_clock::now();
return submitRequest(std::move(request));
}
这里创建了一个 IORequest 对象,记录了所有必要的信息。特别注意 submit_time 字段,它用于后续计算 IO 延迟。
uint64_t AsyncIOUring::submitRequest(std::unique_ptr<IORequest> request) {
if (!initialized_ || !running_) return 0;
request->req_id = generateRequestId();
std::lock_guard<std::mutex> lock(submit_mutex_);
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (!sqe) {
std::cerr << "获取 SQE 失败:队列已满" << std::endl;
return 0;
}
switch (request->op_type) {
case IOOpType::READ:
io_uring_prep_read(sqe, request->fd, request->buffer, request->length, request->offset);
break;
case IOOpType::WRITE:
io_uring_prep_write(sqe, request->fd, request->buffer, request->length, request->offset);
break;
case IOOpType::FSYNC:
io_uring_prep_fsync(sqe, request->fd, 0);
break;
default:
return 0;
}
io_uring_sqe_set_data(sqe, reinterpret_cast<void*>(request->req_id));
uint64_t req_id = request->req_id;
{
std::lock_guard<std::mutex> req_lock(request_mutex_);
pending_requests_[req_id] = std::move(request);
}
stats_.total_requests++;
int pending = pending_submissions_.fetch_add(1, std::memory_order_relaxed) + 1;
if (pending >= BATCH_SUBMIT_SIZE) {
int ret = io_uring_submit(&ring_);
if (ret < 0) {
std::cerr << "批量提交 IO 请求失败:" << strerror(-ret) << std::endl;
}
pending_submissions_.store(0, std::memory_order_relaxed);
}
return req_id;
}
- 获取 SQE:
io_uring_get_sqe 从 SQ 中获取一个空闲的条目。如果队列已满,返回 NULL。
- 准备 SQE:根据操作类型调用相应的
io_uring_prep_* 函数。这些函数实际上只是填充 SQE 结构体的字段。
- 设置用户数据:
io_uring_sqe_set_data 将请求 ID 作为用户数据附加到 SQE 上。当 IO 完成时,CQE 会携带这个用户数据,从而可以找到对应的请求。
- 保存请求:将请求对象保存到
pending_requests_ 映射中,以便完成时能找到回调函数。
- 批量提交:只有累积足够多的请求才提交,这是性能优化的关键。
完成处理线程的实现
void AsyncIOUring::completionThread() {
struct io_uring_cqe* cqe;
struct __kernel_timespec timeout;
timeout.tv_sec = 0;
timeout.tv_nsec = 1000000;
while (running_) {
int ret = io_uring_wait_cqe_timeout(&ring_, &cqe, &timeout);
if (ret < 0) {
if (ret == -EINTR || ret == -ETIME) {
continue;
}
if (running_) {
std::cerr << "等待完成事件失败:" << strerror(-ret) << std::endl;
}
break;
}
handleCompletion(cqe);
io_uring_cqe_seen(&ring_, cqe);
unsigned head;
unsigned count = 0;
io_uring_for_each_cqe(&ring_, head, cqe) {
handleCompletion(cqe);
count++;
if (count >= 128) break;
}
if (count > 0) {
io_uring_cq_advance(&ring_, count);
}
completion_cv_.notify_all();
}
}
1. 超时等待:使用 1ms 超时而不是无限等待。这样可以在析构时能够及时退出线程,避免死锁。
2. 批量处理:不是处理一个 CQE 就立即返回,而是尝试一次性处理所有可用的 CQE。io_uring_for_each_cqe 是一个宏,它遍历 CQ 中所有已就绪的条目。这样做的好处是:
- 减少循环次数
- 减少系统调用(
io_uring_cq_advance 一次性标记多个 CQE 为已读)
- 提高吞吐量
3. 限制批量大小:最多一次处理 128 个 CQE。这是为了避免在极高负载下,完成线程长时间占用 CPU,影响其他线程的运行。
void AsyncIOUring::handleCompletion(struct io_uring_cqe* cqe) {
uint64_t req_id = reinterpret_cast<uint64_t>(io_uring_cqe_get_data(cqe));
if (req_id == 0) return;
std::unique_ptr<IORequest> request;
{
std::lock_guard<std::mutex> lock(request_mutex_);
auto it = pending_requests_.find(req_id);
if (it == pending_requests_.end()) return;
request = std::move(it->second);
pending_requests_.erase(it);
}
ssize_t result = cqe->res;
int error = (result < 0) ? -result : 0;
uint64_t completed = stats_.completed_requests.fetch_add(1, std::memory_order_relaxed) + 1;
if (error != 0) {
stats_.failed_requests.fetch_add(1, std::memory_order_relaxed);
} else {
if (request->op_type == IOOpType::READ) {
stats_.total_bytes_read.fetch_add(result, std::memory_order_relaxed);
} else if (request->op_type == IOOpType::WRITE) {
stats_.total_bytes_written.fetch_add(result, std::memory_order_relaxed);
}
}
if ((completed & 0x3F) == 0) {
auto now = std::chrono::steady_clock::now();
auto latency = std::chrono::duration_cast<std::chrono::microseconds>(now - request->submit_time).count();
uint64_t old_latency = stats_.avg_latency_us.load(std::memory_order_relaxed);
uint64_t new_latency = (old_latency * 9 + latency) / 10;
stats_.avg_latency_us.store(new_latency, std::memory_order_relaxed);
}
if (request->callback) {
try {
request->callback(result, error);
} catch (const std::exception& e) {
std::cerr << "回调函数异常:" << e.what() << std::endl;
}
}
}
1. 锁粒度优化:只在查找和删除请求时持有锁,然后立即释放。回调函数的执行是在锁外进行的,这样可以:
- 避免回调函数中的耗时操作阻塞其他完成事件的处理
- 避免回调函数中调用
submitRead/submitWrite 导致的死锁(因为它们也需要获取锁)
2. 延迟采样统计:计算每个请求的延迟需要调用 std::chrono::steady_clock::now(),这个操作在某些 CPU 上可能需要几十纳秒。为了减少开销,代码只在每 64 个请求时采样一次。使用位运算 (completed & 0x3F) == 0 来判断是否是 64 的倍数,这比 completed % 64 == 0 快得多。
3. 指数移动平均:使用简单的 (old * 9 + new) / 10 公式来计算平均延迟。这比维护一个延迟历史队列要高效得多,同时对最近的数据赋予更高的权重。
3.2 AsyncFileIO:易用的高层封装
AsyncFileIO 类在 AsyncIOUring 的基础上,提供了更符合日常使用习惯的文件操作接口。
文件句柄管理
class AsyncFileHandle {
public:
AsyncFileHandle(int fd, const std::string& path)
: fd_(fd), path_(path), offset_(0) {}
~AsyncFileHandle() {
if (fd_ >= 0) close(fd_);
}
int getFd() const { return fd_; }
const std::string& getPath() const { return path_; }
off_t getOffset() const { return offset_; }
void setOffset(off_t offset) { offset_ = offset; }
private:
int fd_;
std::string path_;
off_t offset_;
};
这个类使用 RAII(Resource Acquisition Is Initialization)模式,确保文件描述符在对象销毁时自动关闭。同时维护了当前的文件偏移量,这样用户不需要手动跟踪读写位置。
文件打开与优化
std::shared_ptr<AsyncFileHandle> AsyncFileIO::openFile(const std::string& path, FileOpenMode mode) {
std::lock_guard<std::mutex> lock(file_mutex_);
int flags = modeToFlags(mode);
if (use_direct_io_) flags |= O_DIRECT;
int fd = open(path.c_str(), flags, 0644);
if (fd < 0) {
std::cerr << "打开文件失败 [" << path << "]: " << strerror(errno) << std::endl;
return nullptr;
}
if (!use_direct_io_ && (mode == FileOpenMode::READ_ONLY || mode == FileOpenMode::READ_WRITE)) {
posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
}
return std::make_shared<AsyncFileHandle>(fd, path);
}
这里的 posix_fadvise 调用非常重要。POSIX_FADV_SEQUENTIAL 标志告诉内核:
- 这个文件会被顺序读取
- 应该激进地进行预读(read-ahead),提前将后续数据加载到页缓存
- 已经读过的页面可以快速释放,因为不太可能再次访问
异步读写的包装
uint64_t AsyncFileIO::readAsync(std::shared_ptr<AsyncFileHandle> handle, void* buffer, size_t length, std::function<void(ssize_t, int)> callback) {
if (!handle) return 0;
off_t offset = handle->getOffset();
auto wrapped_callback = [handle, offset, length, callback](ssize_t result, int error) {
if (result > 0) handle->setOffset(offset + result);
if (callback) callback(result, error);
};
return io_uring_->submitRead(handle->getFd(), buffer, length, offset, wrapped_callback);
}
这里使用了回调包装(callback wrapping)技巧:
- 捕获当前的文件偏移量
- 创建一个新的回调函数,它会在 IO 完成后自动更新偏移量
- 然后调用用户的回调函数
同步接口的实现
std::future<ssize_t> AsyncFileIO::readSync(std::shared_ptr<AsyncFileHandle> handle, void* buffer, size_t length) {
auto promise = std::make_shared<std::promise<ssize_t>>();
std::future<ssize_t> future = promise->get_future();
readAsync(handle, buffer, length, [promise](ssize_t result, int error) {
if (error != 0) promise->set_value(-error);
else promise->set_value(result);
});
return future;
}
- 创建一个
std::promise 和对应的 std::future
- 在异步回调中,将结果通过
promise->set_value() 传递
- 返回
future 给用户,用户可以调用 future.get() 来阻塞等待结果
批量 IO 操作
void AsyncFileIO::readBatch(std::shared_ptr<AsyncFileHandle> handle,
const std::vector<void*>& buffers,
const std::vector<size_t>& lengths,
std::function<void(std::vector<ssize_t>)> callback) {
if (buffers.size() != lengths.size()) return;
auto results = std::make_shared<std::vector<ssize_t>>(buffers.size(), 0);
auto counter = std::make_shared<std::atomic<size_t>>(buffers.size());
for (size_t i = 0; i < buffers.size(); ++i) {
readAsync(handle, buffers[i], lengths[i], [results, counter, callback, i](ssize_t result, int error) {
(*results)[i] = (error != 0) ? -error : result;
if (counter->fetch_sub(1) == 1) {
if (callback) callback(*results);
}
});
}
}
- 创建一个共享的结果向量和计数器
- 为每个子操作提交异步请求
- 在每个子操作的回调中,保存结果并递减计数器
- 当计数器归零时(所有子操作都完成),调用用户的回调函数
注意这里使用 std::shared_ptr 来管理 results 和 counter,这样可以确保即使某些回调延迟执行,这些对象也不会被提前销毁。
3.3 AsyncLogger:生产级异步日志系统
AsyncLogger 是本项目的实战应用示例,展示了如何使用异步 IO 构建一个高性能的日志系统。
核心数据结构
class AsyncLogger {
private:
std::unique_ptr<AsyncFileIO> file_io_;
std::shared_ptr<AsyncFileHandle> current_file_;
std::string log_dir_;
size_t max_file_size_;
std::atomic<size_t> current_file_size_;
std::vector<char> buffer_;
size_t buffer_pos_;
std::mutex buffer_mutex_;
std::atomic<uint64_t> total_logs_;
std::atomic<uint64_t> total_bytes_;
std::thread flush_thread_;
std::atomic<bool> running_;
};
- 内部缓冲区:
buffer_ 用于暂存日志,避免每条日志都触发一次 IO
- 原子计数器:使用
std::atomic 来统计日志数量和字节数,避免锁竞争
- 定期刷新线程:
flush_thread_ 每隔 1 秒自动将缓冲区刷新到磁盘
日志写入流程
void AsyncLogger::log(LogLevel level, const std::string& message) {
std::stringstream ss;
ss << "[" << getCurrentTimestamp() << "] "
<< "[" << getLevelString(level) << "] " << message << "\n";
std::string log_line = ss.str();
std::lock_guard<std::mutex> lock(buffer_mutex_);
if (buffer_pos_ + log_line.size() > buffer_.size()) flushBuffer();
memcpy(buffer_.data() + buffer_pos_, log_line.c_str(), log_line.size());
buffer_pos_ += log_line.size();
total_logs_++;
}
这里的关键设计是先缓冲,再批量写入。每条日志只是追加到内存缓冲区,不触发 IO。只有当:
才会真正写入磁盘。这大大减少了 IO 次数,提高了性能。
void AsyncLogger::flushBuffer() {
if (!current_file_ || buffer_pos_ == 0) return;
if (current_file_size_.load(std::memory_order_relaxed) + buffer_pos_ > max_file_size_) {
rotateLogFile();
}
auto data_copy = std::make_shared<std::vector<char>>(buffer_.begin(), buffer_.begin() + buffer_pos_);
size_t size = buffer_pos_;
file_io_->writeAsync(current_file_, data_copy->data(), size, [this, size, data_copy](ssize_t result, int error) {
if (error == 0 && result > 0) {
current_file_size_.fetch_add(result, std::memory_order_relaxed);
total_bytes_.fetch_add(result, std::memory_order_relaxed);
}
});
buffer_pos_ = 0;
}
1. 数据拷贝:由于 writeAsync 是异步的,它可能在 flushBuffer 返回后才真正执行。因此不能直接传递 buffer_ 的指针,必须拷贝一份数据。这里使用 std::shared_ptr<std::vector<char>>,并在回调中捕获它,确保数据在 IO 完成前不会被销毁。
2. 原子操作:current_file_size_ 使用 std::atomic,因为它会在异步回调中被更新,而回调可能在不同的线程执行。
日志文件轮转
void AsyncLogger::rotateLogFile() {
auto now = std::time(nullptr);
auto tm = std::localtime(&now);
std::stringstream ss;
ss << log_dir_ << "/log_" << std::put_time(tm, "%Y%m%d_%H%M%S") << ".log";
std::string new_log_path = ss.str();
if (current_file_) {
file_io_->flushSync(current_file_).get();
current_file_.reset();
}
current_file_ = file_io_->openFile(new_log_path, FileOpenMode::APPEND);
current_file_size_.store(0, std::memory_order_relaxed);
std::cout << "日志轮转:" << new_log_path << std::endl;
}
当日志文件达到设定的最大大小(默认 100MB)时,会自动创建一个新文件。文件名包含时间戳,便于归档和查找。
四、示例程序深度解读
4.1 database_sim.cpp:模拟数据库存储引擎
这个示例展示了如何使用异步 IO 构建一个简单的键值存储引擎,类似于 RocksDB 或 LevelDB 的简化版本。
记录结构设计
constexpr size_t KEY_SIZE = 32;
constexpr size_t VALUE_SIZE = 256;
constexpr size_t RECORD_SIZE = KEY_SIZE + VALUE_SIZE;
struct Record {
char key[KEY_SIZE];
char value[VALUE_SIZE];
Record() {
memset(key, 0, KEY_SIZE);
memset(value, 0, VALUE_SIZE);
}
Record(const std::string& k, const std::string& v) {
memset(key, 0, KEY_SIZE);
memset(value, 0, VALUE_SIZE);
strncpy(key, k.c_str(), KEY_SIZE - 1);
strncpy(value, v.c_str(), VALUE_SIZE - 1);
}
};
采用固定大小的记录格式,这样可以通过简单的计算直接定位任意记录的文件偏移量:offset = record_index * RECORD_SIZE。这是许多数据库索引结构(如 B+ 树)的基础。
索引管理
class SimpleKVStore {
private:
std::unordered_map<std::string, off_t> index_;
size_t record_count_;
void loadIndex() {
struct stat st;
if (stat(db_path_.c_str(), &st) != 0) return;
size_t file_size = st.st_size;
size_t num_records = file_size / RECORD_SIZE;
for (size_t i = 0; i < num_records; ++i) {
Record record;
data_file_->setOffset(i * RECORD_SIZE);
auto future = file_io_->readSync(data_file_, &record, RECORD_SIZE);
ssize_t result = future.get();
if (result > 0) {
std::string key = record.getKey();
if (!key.empty()) {
index_[key] = i * RECORD_SIZE;
}
}
}
record_count_ = num_records;
}
};
这是一个简化的索引实现。在初始化时,读取整个文件,将每个记录的 key 和其文件偏移量建立映射。真实的数据库会使用更复杂的结构(如 B+ 树)来避免加载整个索引到内存。
批量写入
void SimpleKVStore::batchWrite(const std::vector<Record>& records, std::function<void(bool)> callback) {
auto start_time = std::chrono::high_resolution_clock::now();
std::vector<const void*> buffers;
std::vector<size_t> lengths;
off_t start_offset = record_count_ * RECORD_SIZE;
for (const auto& record : records) {
auto* record_copy = new Record(record);
buffers.push_back(record_copy);
lengths.push_back(RECORD_SIZE);
std::string key = record.getKey();
off_t offset = start_offset + (buffers.size() - 1) * RECORD_SIZE;
index_[key] = offset;
}
record_count_ += records.size();
file_io_->writeBatch(data_file_, buffers, lengths, [this, buffers, start_time, callback](std::vector<ssize_t> results) {
for (auto* buffer : buffers) delete static_cast<const Record*>(buffer);
bool success = true;
size_t total_written = 0;
for (auto result : results) {
if (result < 0) { success = false; break; }
total_written += result;
}
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
if (success) {
std::cout << "批量写入完成:" << results.size() << " 条记录," << total_written << " 字节," << duration << " us" << std::endl;
}
if (callback) callback(success);
});
}
- 一次性提交多个 IO 请求,充分利用磁盘的并行处理能力
- 减少系统调用次数
- 在支持 NCQ(Native Command Queuing)的 SSD 上,可以显著提升性能
测试场景
void testMixedWorkload(SimpleKVStore& db, int duration_seconds) {
std::cout << "\n========== 混合读写测试 ==========" << std::endl;
std::cout << "运行时长:" << duration_seconds << " 秒" << std::endl;
std::cout << "工作负载:70% 读取,30% 写入" << std::endl;
std::atomic<bool> running{true};
std::atomic<uint64_t> total_reads{0};
std::atomic<uint64_t> total_writes{0};
std::vector<std::thread> workers;
int num_workers = 4;
for (int i = 0; i < num_workers; ++i) {
workers.emplace_back([&, i]() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> op_dis(0, 99);
std::uniform_int_distribution<> id_dis(0, 10000);
while (running) {
int op = op_dis(gen);
if (op < 70) {
std::string key = "key_" + std::to_string(id_dis(gen));
db.read(key);
total_reads++;
} else {
std::vector<Record> records;
records.push_back(generateRandomRecord(id_dis(gen)));
db.batchWrite(records, [](bool success) {});
total_writes++;
}
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
});
}
std::thread monitor([&]() {
uint64_t last_reads = 0;
uint64_t last_writes = 0;
while (running) {
std::this_thread::sleep_for(std::chrono::seconds(1));
uint64_t current_reads = total_reads.load();
uint64_t current_writes = total_writes.load();
uint64_t reads_per_sec = current_reads - last_reads;
uint64_t writes_per_sec = current_writes - last_writes;
last_reads = current_reads;
last_writes = current_writes;
std::cout << "读取:" << reads_per_sec << " ops/sec, " << "写入:" << writes_per_sec << " ops/sec" << std::endl;
}
});
std::this_thread::sleep_for(std::chrono::seconds(duration_seconds));
running = false;
for (auto& worker : workers) worker.join();
monitor.join();
}
- 70% 的操作是读取(模拟查询)
- 30% 的操作是写入(模拟插入/更新)
- 4 个工作线程并发访问
- 实时监控每秒的操作数
4.2 high_freq_logger.cpp:对比同步/异步性能
同步日志基准
void synchronousLoggingTest(int num_logs, int num_threads) {
std::cout << "\n========== 同步 IO 日志写入测试 ==========" << std::endl;
auto start_time = std::chrono::high_resolution_clock::now();
FILE* log_file = fopen("./logs/sync_test.log", "w");
std::vector<std::thread> threads;
std::mutex file_mutex;
for (int t = 0; t < num_threads; ++t) {
threads.emplace_back([&, t]() {
int logs_per_thread = num_logs / num_threads;
for (int i = 0; i < logs_per_thread; ++i) {
auto log = generateRandomLog();
std::string log_str = formatLog(log) + "\n";
std::lock_guard<std::mutex> lock(file_mutex);
fwrite(log_str.c_str(), 1, log_str.size(), log_file);
}
});
}
for (auto& thread : threads) thread.join();
fflush(log_file);
fclose(log_file);
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
std::cout << "完成时间:" << duration << " ms" << std::endl;
std::cout << "吞吐量:" << (num_logs * 1000.0 / duration) << " logs/sec" << std::endl;
}
- 每次
fwrite 都可能触发一次系统调用
- 多线程需要用锁保护文件指针,导致线程间竞争
- 业务线程被 IO 阻塞,无法继续处理请求
异步日志实现
void asynchronousLoggingTest(int num_logs, int num_threads) {
std::cout << "\n========== 异步 IO 日志写入测试 ==========" << std::endl;
auto start_time = std::chrono::high_resolution_clock::now();
AsyncLogger logger("./logs", 50 * 1024 * 1024, 64 * 1024);
if (!logger.initialize()) {
std::cerr << "初始化异步日志器失败" << std::endl;
return;
}
std::vector<std::thread> threads;
std::atomic<int> completed_logs{0};
for (int t = 0; t < num_threads; ++t) {
threads.emplace_back([&, t]() {
int logs_per_thread = num_logs / num_threads;
for (int i = 0; i < logs_per_thread; ++i) {
auto log = generateRandomLog();
std::string log_str = formatLog(log);
logger.log(AsyncLogger::LogLevel::INFO, log_str);
completed_logs++;
}
});
}
for (auto& thread : threads) thread.join();
logger.flush();
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
std::cout << "完成时间:" << duration << " ms" << std::endl;
std::cout << "吞吐量:" << (num_logs * 1000.0 / duration) << " logs/sec" << std::endl;
}
- 业务线程只需将日志追加到内存缓冲区,立即返回(几十纳秒)
- 实际的 IO 在后台线程批量执行,减少系统调用
- 无锁设计(只在操作缓冲区时需要锁)
在实测中,异步方案的吞吐量是同步方案的3-5 倍。
总结
io_uring 作为 Linux 内核的最新异步 IO 接口,代表了未来的方向。它不仅支持文件 IO,还支持网络 IO、定时器、信号等几乎所有的 IO 操作。无论你是想为自己的项目添加高性能 IO 能力,还是希望深入学习系统编程和性能优化,这都是一个非常适合的项目!
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
- JSON美化和格式化
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online