1. 连接管理
在客户端这边,RabbitMQ 弱化了客户端的概念,因为用户所需的服务都是通过信道来提供的,因此操作思想转换为先创建连接,通过连接创建信道,通过信道提供服务这一流程。
这个模块同样是针对 muduo 库客户端连接的二次封装,向用户提供创建 channel 信道的接口,创建信道后,可以通过信道来获取指定服务。
#ifndef __M_CONNECTION_H__
#define __M_CONNECTION_H__
#include "muduo/proto/dispatcher.h"
#include "muduo/proto/codec.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/base/CountDownLatch.h"
#include "channel.hpp"
#include "worker.hpp"
namespace rabbitmq {
class Connection {
public:
using ptr = std::shared_ptr<Connection>;
Connection(const std::string server_ip, int server_port, const AsyncWorker::ptr& worker)
: _latch(1),
_client(worker->_loopthread.startLoop(), muduo::net::InetAddress(server_ip, server_port), "Client"),
_dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),
_worker(worker),
_channel_manager(std::make_shared<ChannelManager>()) {
_dispatcher.registerMessageCallback<rabbitmq::basicCommonResponse>(std::bind(&Connection::basicResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<rabbitmq::basicConsumeResponse>(std::bind(&Connection::consumeResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));
_client.connect();
_latch.wait(); // 阻塞等待,直到连接建立成功
}
Channel::ptr openChannel() {
Channel::ptr channel = _channel_manager->create(_conn, _codec);
bool ret = channel->openChannel();
if (ret == false) {
DLOG("打开信道失败!");
return Channel::ptr();
}
return channel;
}
void closeChannel(const Channel::ptr& channel) {
channel->closeChannel();
_channel_manager->remove(channel->cid());
}
private:
void basicResponse(const muduo::net::TcpConnectionPtr &conn, const basicCommonResponsePtr &message, muduo::Timestamp) {
// 1. 找到信道
Channel::ptr channel = _channel_manager->get(message->cid());
if (channel.get() == nullptr) {
DLOG("未找到信道信息!");
return;
}
// 2. 将得到的响应对象,添加到信道的基础响应 hash_map 中
channel->putBasicResponse(message);
}
void consumeResponse(const muduo::net::TcpConnectionPtr &conn, const basicConsumeResponsePtr &message, muduo::Timestamp) {
// 1. 找到信道
Channel::ptr channel = _channel_manager->get(message->cid());
if (channel.get() == nullptr) {
DLOG("未找到信道信息!");
return;
}
// 2. 封装异步任务(消息处理任务),抛入线程池
_worker->_pool.push([channel, message]() { channel->consume(message); });
}
void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp) {
LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
conn->shutdown();
}
// 连接建立成功时候的回调函数,连接建立成功后,唤醒上边的阻塞
void onConnection(const muduo::net::TcpConnectionPtr &conn) {
if (conn->connected()) {
_latch.countDown(); // 唤醒主线程中的阻塞
_conn = conn;
} else {
// 连接关闭时的操作
_conn.reset();
}
}
private:
muduo::CountDownLatch _latch; // 实现同步的
muduo::net::TcpConnectionPtr _conn; // 客户端对应的连接
muduo::net::TcpClient _client; // 客户端
ProtobufDispatcher _dispatcher; // 请求分发器对象--要向其中注册请求处理函数
ProtobufCodecPtr _codec; // protobuf 协议处理器--针对收到的请求数据进行 protobuf 协议处理
AsyncWorker::ptr _worker;
ChannelManager::ptr _channel_manager;
};
}
#endif
这段代码是 RabbitMQ 客户端的连接管理模块,它负责与 RabbitMQ 服务器建立连接,并管理信道(Channel)的创建和关闭。主要功能包括:
- 封装 muduo 库的 TcpClient,建立与服务器的 TCP 连接。
- 使用 ProtobufCodec 进行消息的编解码,使用 ProtobufDispatcher 进行消息的分发。
- 管理多个信道(Channel),每个信道可以执行不同的 AMQP 操作(如声明交换机、队列,发布消息等)。
- 异步处理服务器推送的消息(如 basicConsumeResponse),通过线程池处理消息消费。
关键点:
- 连接建立:在构造函数中,通过 TcpClient 连接服务器,并使用 CountDownLatch 等待连接建立成功。连接建立成功后,onConnection 回调函数会被调用,唤醒主线程。
- 消息分发:通过 ProtobufDispatcher 注册了两类消息的回调函数:basicCommonResponse(普通响应)和 basicConsumeResponse(消费消息响应)。当收到服务器消息时,根据消息类型分发给对应的处理函数。
- 信道管理:通过 ChannelManager 管理信道的创建和删除。每个信道有一个唯一的 cid(信道 ID),在创建信道时生成。当收到响应消息时,根据 cid 找到对应的信道,将响应放入信道的响应映射表中,等待信道处理。
- 异步消息处理:对于消费消息(basicConsumeResponse),将其封装成任务提交到线程池中,由工作线程调用信道的 consume 方法处理消息(即调用用户注册的回调函数)。
- 连接关闭:当收到未知消息时,会关闭连接。另外,当服务器断开连接时,也会触发 onConnection 回调,重置连接指针。
响应处理
- 基础响应(basicCommonResponse):直接找到对应的信道,并将响应放入信道的响应映射表中,由信道中等待响应的线程处理(通过条件变量唤醒)。
- 消费响应(basicConsumeResponse):找到对应的信道,然后将消息处理任务提交到线程池,由工作线程执行信道的
consume方法(最终调用用户注册的回调函数)。
2. 搭建客户端

发布消息的生产者客户端
#include "connection.hpp"
int main() {
// 1. 实例化异步工作线程对象
rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();
// 2. 实例化连接对象
rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1", 8085, awp);
// 3. 通过连接创建信道
rabbitmq::Channel::ptr channel = conn->openChannel();
// 4. 通过信道提供的服务完成所需
// 4.1. 声明一个交换机 exchange1,交换机类型为广播模式
google::protobuf::Map<std::string, std::string> tmp_map;
channel->declareExchange("exchange1", rabbitmq::ExchangeType::FANOUT, true, false, tmp_map);
// 4.2. 声明一个队列 queue1
channel->declareQueue("queue1", true, false, false, tmp_map);
// 4.3. 声明一个队列 queue2
channel->declareQueue("queue2", true, false, false, tmp_map);
// 4.4. 绑定 queue1-exchange1,且 binding_key 设置为 queue1
channel->queueBind("exchange1", "queue1", "queue1");
// 4.5. 绑定 queue2-exchange1, 且 binding_key 设置为 news.music.#
// channel->queueBind("exchange1", "queue2", "news.music.#");
// 5. 循环向交换机发布消息
for (int i = 0; i < 10; i++) {
channel->(, , + std::(i));
}
conn->(channel);
;
}
订阅消息的消费者客户端
#include "connection.hpp"
void cb(rabbitmq::Channel::ptr &channel, const std::string consumer_tag, const rabbitmq::BasicProperties *bp, const std::string &body) {
std::cout << consumer_tag << "消费了消息:" << body << std::endl;
channel->basicAck(bp->id());
}
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cout << "usage: ./consume_client queue1\n";
return -1;
}
// 1. 实例化异步工作线程对象
rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();
// 2. 实例化连接对象
rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1", 8085, awp);
// 3. 通过连接创建信道
rabbitmq::Channel::ptr channel = conn->openChannel();
// 4. 通过信道提供的服务完成所需
// 4.1. 声明一个交换机 exchange1,交换机类型为广播模式
google::protobuf::Map<std::string, std::string> tmp_map;
channel->declareExchange("exchange1", rabbitmq::ExchangeType::FANOUT, true, false, tmp_map);
// 4.2. 声明一个队列 queue1
channel->declareQueue("queue1", true, false, false, tmp_map);
// 4.3. 声明一个队列 queue2
channel->declareQueue(, , , , tmp_map);
channel->(, , );
channel->(, , );
functor = std::(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
channel->(, argv[], , functor);
() std::this_thread::(std::chrono::());
conn->(channel);
;
}
一、生产者客户端流程
生产者客户端的主要流程是建立连接、创建信道、声明交换机和队列、绑定队列到交换机,然后向交换机发布消息。
步骤详解:
-
实例化异步工作线程对象:
rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();创建异步工作线程对象,该对象包含一个 EventLoopThread(用于网络 IO)和一个线程池(用于处理消息)。
-
实例化连接对象:
rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1", 8085, awp);创建 Connection 对象,传入服务器 IP 和端口以及异步工作线程对象。在构造函数中,会建立 TCP 连接,并且阻塞直到连接成功。
-
通过连接创建信道:
rabbitmq::Channel::ptr channel = conn->openChannel();通过连接对象创建一个信道。在 openChannel 方法中,会通过 ChannelManager 创建 Channel 对象,并向服务器发送打开信道的请求,等待响应。
-
通过信道提供的服务完成所需操作: 声明一个交换机 exchange1,类型为广播模式(FANOUT):
channel->declareExchange("exchange1", rabbitmq::ExchangeType::FANOUT, true, false, tmp_map);声明两个队列 queue1 和 queue2:
channel->declareQueue("queue1", true, false, false, tmp_map); channel->declareQueue("queue2", true, false, false, tmp_map);绑定队列到交换机,并设置绑定键(binding key):
channel->queueBind("exchange1", "queue1", "queue1"); channel->queueBind("exchange1", "queue2", "news.music.#"); -
循环向交换机发布消息:
for (int i = 0; i < ; i++) { channel->(, , + std::(i)); }
二、消费者客户端流程
消费者客户端的主要流程是建立连接、创建信道、声明交换机和队列、绑定队列到交换机,然后订阅队列中的消息,并处理消息。
步骤详解:
-
实例化异步工作线程对象:同生产者。
-
实例化连接对象:同生产者。
-
通过连接创建信道:同生产者。
-
通过信道提供的服务完成所需操作:声明交换机和队列、绑定队列,与生产者相同。
-
订阅队列消息:
auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); channel->basicConsume("consumer1", argv[1], false, functor);创建一个回调函数,然后调用 channel 的 basicConsume 方法订阅队列。这里传入消费者标签为 "consumer1",队列名由命令行参数指定,自动确认为 false(即手动确认),以及回调函数。
-
循环等待消息:
while (1) std::this_thread::sleep_for(std::chrono::seconds(3));主线程循环等待,因为消息的处理是异步的(在工作线程池中),所以这里需要保持程序运行。
-
关闭信道:
conn->closeChannel(channel);注意:这里是通过连接对象来关闭信道,实际上内部会调用 channel 的 closeChannel 方法,并从 ChannelManager 中移除。
-
程序结束:同样,连接和异步工作线程对象会随着智能指针的释放而自动清理。
三、消息处理流程
当消费者客户端订阅队列后,服务器会推送消息到客户端。客户端的网络线程(EventLoopThread)接收到消息后,通过 ProtobufCodec 解码,然后由 ProtobufDispatcher 根据消息类型分发。
对于消费消息(basicConsumeResponse),会调用 Connection::consumeResponse,然后在线程池中执行 Channel::consume 方法,最终调用用户注册的回调函数。
在回调函数中,用户处理消息,并手动确认消息(basicAck)。确认消息会发送一个请求到服务器,服务器收到后从队列中删除消息。
3. 功能联调
广播模式下的测试
分别编译运行:server、consume_client queue1、consume_client queue2、publish_client
结果如下:
user@mq-server:~/rabbit-mq/mqserver$ ./server
2026020314:42:57.353717Z 2752283 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#1] from 127.0.0.1:36086 - TcpServer.cc:80
[DBG][22:42:57][channel.hpp:43] new Channel: 0x5580b78cdfd0
[DBG][22:42:57][connection.hpp:26] b34ba8ee-60be-beb1-0000-000000000001 信道创建成功!
...
user@mq-client:~/rabbit-mq/mqclient$ ./consume_client queue1
2026020314:42:57.353236Z 2757934 INFO TcpClient::TcpClient[Client] - connector 0x56352AF5F770 - TcpClient.cc:69
2026020314:42:57.353557Z 2757934 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107
在运行发布消息的生产者客户端(publish_client)时,我们应该可以看到消费者客户端会消费消息,并通过回调函数将消息打印出来,但是我们在消费者客户端没有看到任何消息,说明我们代码中存在一点小问题。

经过调试发现,在服务端的连接管理模块实现时,我们在调用打开信道的接口时的传参应该插入信道 id,结果因为写错了,写成了请求 id。再次编译运行,发现还是和刚刚一样。
这里我们在对网络通信进行二次封装时(信道管理模块和连接管理模块),我们是没有进行测试的,所以我们大致方向应该就是在网络通信时出现了问题,在服务器的日志信息中可以看到服务器在创建信道时,成功将信道创建,但是客户端我们不知道有没有在收到服务器响应,并且在收到响应后有没有将其加入到 hash_map 中管理起来,所以我们可以在客户端添加一些日志。
我们可以在客户端创建信道,声明交换机/队列,删除交换机/队列等请求时添加日志,看客户端是否正常发送请求以及接收响应。

同样在服务端我们也添加一些日志,看看信道创建成功之后到底有没有给客户端响应。

运行结果如下:
user@mq-server:~/rabbit-mq/mqserver$ ./server
20260204 06:19:33.333944Z 3906265 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#1] from 127.0.0.1:35632 - TcpServer.cc:80
[DBG][14:19:33][channel.hpp:43] new Channel: 0x55aedc750ad0
[DBG][14:19:33][connection.hpp:26] d5ba2ba1-5824-a417-0000-000000000001 信道创建成功!
[DBG][14:19:33][connection.hpp:29] 客户端创建信道已响应
...
user@mq-client:~/rabbit-mq/mqclient$ ./consume_client queue1
20260204 06:19:33.333532Z 3906405 INFO TcpClient::TcpClient[Client] - connector 0x56242E8A9770 - TcpClient.cc:69
20260204 06:19:33.333756Z 3906405 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107
[DBG][14:19:33][channel.hpp:39] 客户端声明信道请求已发送
可以看到服务端响应了客户端创建信道的请求,但是客户端在接收到服务端响应后应该打印一条日志 '客户端声明信道请求,服务器已响应',但是我们并没有在客户端中看到这条日志消息,说明我们在等待服务器响应时出现了问题。

但是我们发现等待响应的接口并没有问题,那就有可能是客户端在接收服务器响应时,有没有将其加入到 hash_map 中管理起来。

客户端在接收到服务器的普通响应时会调用 basicResponse 接口去处理这个响应(因为我们在构造函数中通过分发器注册了这个接口),那么我们就需要查看一下这个处理响应的接口有没有问题。

可以看到处理服务器响应的接口也没有问题,但是内部调用了将响应对象添加到 hash_map 中管理起来的接口,所以我们再继续往下看,看也没有正确管理。

我们发现原来将服务器的响应添加到哈希表中管理起来时,我们不是通过信道 id 来查找响应的,应该是通过请求 id 来查找,应该是当时不小心敲错了代码造成的。
因此,在等待响应时由于我们添加到哈希表中的键值是 [信道 id,响应对象],但是我们在条件变量处阻塞的条件是通过 请求 id 来查找对应的响应对象,所以不可能找得到,于是条件不满足就会一直阻塞在这。
下面我们修改好了这个问题后再来编译运行一下:
user@mq-server:~/rabbit-mq/mqserver$ ./server
20260204 07:17:54.963281Z 3961554 INFO TcpServer::newConnection [Server] - new connection [Server-0.0.0.0:8085#1] from 127.0.0.1:48728 - TcpServer.cc:80
[DBG][15:17:54][channel.hpp:43] new Channel: 0x55badb5aaad0
[DBG][15:17:54][connection.hpp:26] 462a9d0e-e95b-e896-0000-000000000001 信道创建成功!
[DBG][15:17:54][connection.hpp:29] 客户端创建信道已响应
[DBG][15:17:54][consumer.hpp:32] new Consumer: 0x55badb602f30
[DBG][15:17:54][channel.hpp:153] 创建消费者成功
...
user@mq-client:~/rabbit-mq/mqclient$ ./consume_client queue1
20260204 07:17:54.962859Z 3961827 INFO TcpClient::TcpClient[Client] - connector 0x55648BB74770 - TcpClient.cc:69
20260204 07:17:54.963116Z 3961827 INFO TcpClient::connect[Client] - connecting to 127.0.0.1:8085 - TcpClient.cc:107
[DBG][15:17:54][channel.hpp:39] 客户端声明信道请求已发送
[DBG][15:17:54][channel.hpp:42] 客户端声明信道请求,服务器已响应
[DBG][15:17:54][channel.hpp:79] 客户端声明交换机请求,服务器已响应
[DBG][15:17:54][channel.hpp:117] 客户端声明队列请求,服务器已响应
[DBG][15:17:54][channel.hpp:117] 客户端声明队列请求,服务器已响应
[DBG][15:17:54][channel.hpp:151] 客户端队列绑定请求,服务器已响应
[DBG][15:17:54][channel.hpp:151] 客户端队列绑定请求,服务器已响应
[DBG][15:17:54][consumer.hpp:32] new Consumer: 0x55648bb6c500
consumer1 消费了消息:Hello World-0
consumer1 消费了消息:Hello World-1
...
consumer1 消费了消息:Hello World-9
问题解决,可以看到运行结果符合预期。
避免日志太多干扰,后续就将日志给注释掉。
直接交换模式下的测试
生产者客户端:
#include "connection.hpp"
int main() {
// 1. 实例化异步工作线程对象
rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();
// 2. 实例化连接对象
rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1", 8085, awp);
// 3. 通过连接创建信道
rabbitmq::Channel::ptr channel = conn->openChannel();
// 4. 通过信道提供的服务完成所需
// 4.1. 声明一个交换机 exchange1,交换机类型为直接交换模式
google::protobuf::Map<std::string, std::string> tmp_map;
channel->declareExchange("exchange1", rabbitmq::ExchangeType::DIRECT, true, false, tmp_map);
// 4.2. 声明一个队列 queue1
channel->declareQueue("queue1", true, false, false, tmp_map);
// 4.3. 声明一个队列 queue2
channel->declareQueue("queue2", true, false, false, tmp_map);
// 4.4. 绑定 queue1-exchange1,且 binding_key 设置为 queue1
channel->queueBind("exchange1", "queue1", "queue1");
// 4.5. 绑定 queue2-exchange1, 且 binding_key 设置为 news.music.#
channel->queueBind("exchange1", "queue2", "news.music.#");
// 5. 循环向交换机发布消息
for (int i = ; i < ; i++) {
rabbitmq::BasicProperties bp;
bp.(rabbitmq::UUIDHelper::());
bp.(rabbitmq::DeliveryMode::DURABLE);
bp.();
channel->(, &bp, + std::(i));
}
conn->(channel);
;
}
消费者客户端只需要修改交换机类型为直接交换模式。
运行结果:

可以看到直接交换模式下 routing_key 和 queue1 的 binding_key 匹配才能消费消息。
主题交换模式下的测试
生产者客户端:
#include "connection.hpp"
int main() {
// 1. 实例化异步工作线程对象
rabbitmq::AsyncWorker::ptr awp = std::make_shared<rabbitmq::AsyncWorker>();
// 2. 实例化连接对象
rabbitmq::Connection::ptr conn = std::make_shared<rabbitmq::Connection>("127.0.0.1", 8085, awp);
// 3. 通过连接创建信道
rabbitmq::Channel::ptr channel = conn->openChannel();
// 4. 通过信道提供的服务完成所需
// 4.1. 声明一个交换机 exchange1,交换机类型为主题交换模式
google::protobuf::Map<std::string, std::string> tmp_map;
channel->declareExchange("exchange1", rabbitmq::ExchangeType::TOPIC, true, false, tmp_map);
// 4.2. 声明一个队列 queue1
channel->declareQueue("queue1", true, false, false, tmp_map);
// 4.3. 声明一个队列 queue2
channel->declareQueue("queue2", true, false, false, tmp_map);
// 4.4. 绑定 queue1-exchange1,且 binding_key 设置为 queue1
channel->queueBind("exchange1", "queue1", "queue1");
// 4.5. 绑定 queue2-exchange1, 且 binding_key 设置为 news.music.#
channel->queueBind("exchange1", "queue2", "news.music.#");
// 5. 循环向交换机发布消息
for (int i = ; i < ; i++) {
rabbitmq::BasicProperties bp;
bp.(rabbitmq::UUIDHelper::());
bp.(rabbitmq::DeliveryMode::DURABLE);
bp.();
channel->(, &bp, + std::(i));
}
rabbitmq::BasicProperties bp;
bp.(rabbitmq::UUIDHelper::());
bp.(rabbitmq::DeliveryMode::DURABLE);
bp.();
channel->(, &bp, );
bp.();
channel->(, &bp, );
conn->(channel);
;
}
消费者客户端只需要改变交换机类型为主题交换模式。

可以看到只有符合匹配规则的队列消息才会被消费。
4. 项目总结
首先明确我们所实现的项目:仿 RabbitMQ 实现一个简化版的消息队列组件,其内部实现了消息队列服务器以及客户端的搭建,并支持不同主机间消息的发布与订阅及消息推送功能。其次项目中所用到的技术:基于 muduo 库实现底层网络通信服务器和客户端的搭建,在应用层基于 protobuf 协议设计应用层协议接口,在数据管理上使用了轻量数据库 sqlite 来进行数据的持久化管理,以及基于 AMQP 模型的理解,实现整个消息队列项目技术的整合,并在项目的实现过程中使用 gtest 框架进行单元测试,完成项目的最终实现。


