跳到主要内容RabbitMQ 与 AMQP-CPP 环境搭建及二次封装 | 极客日志C++
RabbitMQ 与 AMQP-CPP 环境搭建及二次封装
在即时通讯项目中搭建 RabbitMQ 消息中间件及使用 AMQP-CPP C++ 库的过程。内容包括 RabbitMQ 的功能优势、安装配置步骤、AMQP-CPP 的安装方法以及三个具体的代码示例(基础收发、分离发送接收、交换机绑定)。此外,文章还展示了如何对 AMQP-CPP 进行二次封装,实现连接管理、组件声明、消息发布与消费的统一接口,并详细解释了路由键匹配机制及直连交换机的测试验证。通过 libev 事件循环与多线程结合,确保了高性能异步通信的实现。
虚拟内存5 浏览 RabbitMQ 是一个消息中间件,你可以把它理解成一个专门负责接收、存储和转发消息的程序。它让不同的软件系统或者同一个系统的不同模块之间可以相互通信,但不需要直接连接对方。
- 有一个发送消息的程序,我们叫它'生产者'。生产者把消息发给 RabbitMQ。
- RabbitMQ 收到消息后,会把消息保存在一个叫'队列'的地方。
- 另一个接收消息的程序,我们叫它'消费者'。消费者从 RabbitMQ 的队列里取走消息进行处理。
- 解耦:生产者和消费者不需要知道对方的存在,也不需要同时在线。生产者只管发消息,消费者只管处理消息,它们之间通过 RabbitMQ 间接联系。
- 异步:生产者发完消息就可以继续做其他事,不用等消费者处理完。消费者可以在自己方便的时候去取消息。
- 削峰填谷:如果短时间内有大量消息涌入,RabbitMQ 可以先存起来,然后让消费者慢慢处理,避免系统被冲垮。
- 可靠:RabbitMQ 可以确保消息不丢失,即使消费者暂时宕机,消息也会留在队列里,等消费者恢复后再发送。
所以,RabbitMQ 常被用在需要可靠通信的场景,比如电商订单处理、日志收集、任务调度等。它就像是系统之间的'信使',帮忙传递信息,让整个系统更灵活、更稳定。
AMQP-CPP 是一个用于与 RabbitMQ 消息中间件通信的 C++ 库。RabbitMQ 是一个广泛使用的开源消息代理,它实现了 AMQP(高级消息队列协议)。
简单来说,RabbitMQ 负责在不同应用之间传递消息,而 AMQP-CPP 则帮助 C++ 程序与 RabbitMQ 进行交互。
- AMQP-CPP 的核心职责是处理 AMQP 协议本身,即解析从 RabbitMQ 接收到的数据和构造要发送给 RabbitMQ 的数据包。但它并不负责建立和维护网络连接——这意味着实际的 TCP 连接管理需要由你(或你选择的网络库)来完成。
- 这种设计将协议处理与网络 I/O 分离,带来了极大的灵活性:你可以将 AMQP-CPP 集成到任何已有的异步网络框架中(如 libevent、libuv、Boost.Asio 等),也可以直接使用库自带的简易 TCP 模块快速上手。
- AMQP-CPP 完全采用异步设计,内部没有阻塞式的系统调用,也不依赖多线程。它通过事件驱动的方式工作:当网络数据到达时,你将其喂给 AMQP-CPP,库解析后通过回调通知你的业务逻辑;当需要发送数据时,库生成相应的 AMQP 帧,你负责通过 TCP 连接发送出去。这种模型非常适合构建高性能、低延迟的消息应用,且能很好地与单线程事件循环配合。
- AMQP-CPP 需要编译器支持 C++17 标准。这是因为库内部使用了现代 C++ 的特性(如 std::variant、std::optional 等)来实现类型安全和高效的接口。
RabbitMQ 和 AMQP-CPP 的安装
sudo apt install -y rabbitmq-server
sudo systemctl start rabbitmq-server.service
sudo systemctl status rabbitmq-server.service
sudo systemctl enable rabbitmq-server
安装完成后默认有一个 guest 用户,但权限不足,无法用于远程登录和消息收发。
因此需要创建一个具有管理员权限的用户。
sudo rabbitmqctl add_user root 123456
sudo rabbitmqctl set_user_tags root administrator
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
sudo rabbitmq-plugins enable rabbitmq_management
我们去浏览器访问 webUI 界面,默认端口为 15672。
输入设置的 root 账号密码即可登录。
至此 RabbitMQ 安装成功。
我们这里使用 AMQP-CPP 库来编写客户端程序。
sudo apt install libev-dev
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMCPP/
make
sudo make install
注意:如果你在 make 的时候遇到了 SSL 版本相关的错误,表示 ssl 版本出现问题。解决方案是卸载当前的 ssl 库,重新进行修复安装,然后重新执行 make 即可。
示例
示例 1
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <iostream>
int main() {
auto *loop = EV_DEFAULT;
AMQP::LibEvHandler handler(loop);
AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://root:[email protected]:5672/"));
AMQP::TcpChannel channel(&connection);
channel.onReady([&channel, &connection]() {
channel.declareQueue("test_queue")
.onSuccess([&channel, &connection]() {
channel.publish("", "test_queue", "Hello from AMQP-CPP with lambda!");
std::cout << "[x] Sent a message to 'test_queue'" << std::endl;
channel.consume("test_queue")
.onReceived([&channel, &connection](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::cout << "[x] Received '" << message.body() << "'" << std::endl;
channel.ack(deliveryTag);
connection.close();
});
})
.onError([&channel, &connection](const char *message) {
std::cerr << "Queue declaration error: " << message << std::endl;
connection.close();
});
});
ev_run(loop, 0);
return 0;
}
publish_and_consume : test.cpp
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
.PHONY : clean
clean : rm -rf publish_and_consume
示例 2
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <iostream>
int main() {
auto *loop = EV_DEFAULT;
AMQP::LibEvHandler handler(loop);
AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://root:[email protected]:5672/"));
AMQP::TcpChannel channel(&connection);
channel.onReady([&channel, &connection]() {
channel.declareQueue("hello")
.onSuccess([&channel, &connection]() {
channel.publish("", "hello", "Hello World!");
std::cout << "[x] Sent 'Hello World!'" << std::endl;
connection.close();
});
});
ev_run(loop, 0);
return 0;
}
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <iostream>
int main() {
auto *loop = EV_DEFAULT;
AMQP::LibEvHandler handler(loop);
AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://root:[email protected]:5672/"));
AMQP::TcpChannel channel(&connection);
channel.onReady([&channel, &connection]() {
channel.declareQueue("hello")
.onSuccess([&channel, &connection]() {
channel.consume("hello")
.onReceived([&channel, &connection](const AMQP::Message& message, uint64_t deliveryTag, bool redelivered) {
std::cout << "[x] Received '" << message.body() << "'" << std::endl;
channel.ack(deliveryTag);
connection.close();
});
});
});
ev_run(loop, 0);
return 0;
}
all : send receive
send : send.cpp
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
receive : receive.cpp
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
.PHONY : clean
clean : rm -rf send receive
由于使用了 std::cout 来打印二进制数据,终端显示时可能会出现乱码,只要不去打印即可,不影响功能。
示例 3
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
int main() {
auto *loop = EV_DEFAULT;
AMQP::LibEvHandler handler(loop);
AMQP::Address address("amqp://root:[email protected]:5672/");
AMQP::TcpConnection connection(&handler, address);
AMQP::TcpChannel channel(&connection);
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
.onError([](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange 交换机创建成功!" << std::endl;
});
channel.declareQueue("test-queue")
.onError([](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-queue 队列创建成功!" << std::endl;
});
channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
.onError([](const char *message) {
std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;
});
for (int i = 0; i < 10; i++) {
std::string msg = "Hello Bite-" + std::to_string(i);
bool ret = channel.publish("test-exchange", "test-queue-key", msg);
if (ret == false) {
std::cout << "publish 失败!\n";
}
}
ev_run(loop, 0);
return 0;
}
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string msg;
msg.assign(message.body(), message.bodySize());
std::cout << msg << std::endl;
channel->ack(deliveryTag);
}
int main() {
auto *loop = EV_DEFAULT;
AMQP::LibEvHandler handler(loop);
AMQP::Address address("amqp://root:[email protected]:5672/");
AMQP::TcpConnection connection(&handler, address);
AMQP::TcpChannel channel(&connection);
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
.onError([](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange 交换机创建成功!" << std::endl;
});
channel.declareQueue("test-queue")
.onError([](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-queue 队列创建成功!" << std::endl;
});
channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
.onError([](const char *message) {
std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;
});
auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
channel.consume("test-queue", "consume-tag")
.onReceived(callback)
.onError([](const char *message){
std::cout << "订阅 test-queue 队列消息失败:" << message << std::endl;
exit(0);
});
ev_run(loop, 0);
return 0;
}
all : publish consume
publish : publish.cc
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
consume : consume.cc
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
.PHONY : clean
clean : rm publish consume
二次封装
封装过程
基于上述示例,我们可以进一步对 AMQP-CPP 接口进行二次封装,以更方便快捷地实现与 RabbitMQ 的沟通。
class MQClient {
public:
using MessageCallback = std::function<void(const char*, size_t)>;
using ptr = std::shared_ptr<MQClient>;
MQClient(const std::string &user, const std::string passwd, const std::string host) {
_loop = EV_DEFAULT;
_handler = std::make_unique<AMQP::LibEvHandler>(_loop);
std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/";
AMQP::Address address(url);
_connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);
_channel = std::make_unique<AMQP::TcpChannel>(_connection.get());
_loop_thread = std::thread([this]() {
ev_run(_loop, 0);
});
}
private:
struct ev_async _async_watcher;
struct ev_loop *_loop;
std::unique_ptr<AMQP::LibEvHandler> _handler;
std::unique_ptr<AMQP::TcpConnection> _connection;
std::unique_ptr<AMQP::TcpChannel> _channel;
std::thread _loop_thread;
};
针对构造函数和成员变量的设计,基本逻辑与示例一致。
析构函数涉及 libev 事件循环的异步通知机制,用于在多线程环境中安全地停止事件循环:
~MQClient() {
ev_async_init(&_async_watcher, watcher_callback);
ev_async_start(_loop, &_async_watcher);
ev_async_send(_loop, &_async_watcher);
_loop_thread.join();
_loop = nullptr;
}
static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
ev_break(loop, EVBREAK_ALL);
}
这段代码通过 ev_async 机制实现了跨线程通知,确保主线程可以安全地终止事件循环线程。
void declareComponents(const std::string &exchange, const std::string &queue, const std::string &routing_key = "routing_key", AMQP::ExchangeType echange_type = AMQP::ExchangeType::direct)
{
_channel->declareExchange(exchange, echange_type)
.onError([](const char *message) {
LOG_ERROR("声明交换机失败:{}", message);
exit(0);
})
.onSuccess([exchange]() {
LOG_DEBUG("{} 交换机创建成功!", exchange);
});
_channel->declareQueue(queue)
.onError([](const char *message) {
LOG_ERROR("声明队列失败:{}", message);
exit(0);
})
.onSuccess([queue]() {
LOG_DEBUG("{} 队列创建成功!", queue);
});
_channel->bindQueue(exchange, queue, routing_key)
.onError([exchange, queue](const char *message) {
LOG_ERROR("{} - {} 绑定失败:", exchange, queue);
exit(0);
})
.onSuccess([exchange, queue, routing_key]() {
LOG_DEBUG("{} - {} - {} 绑定成功!", exchange, queue, routing_key);
});
}
bool publish(const std::string &exchange, const std::string &msg, const std::string &routing_key = "routing_key") {
LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key);
bool ret = _channel->publish(exchange, routing_key, msg);
if (ret == false) {
LOG_ERROR("{} 发布消息失败:", exchange);
return false;
}
return true;
}
void consume(const std::string &queue, const MessageCallback &cb) {
LOG_DEBUG("开始订阅 {} 队列消息!", queue);
_channel->consume(queue, "consume-tag")
.onReceived([this, cb](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
cb(message.body(), message.bodySize());
_channel->ack(deliveryTag);
})
.onError([queue](const char *message) {
LOG_ERROR("订阅 {} 队列消息失败:{}", queue, message);
exit(0);
});
}
#pragma once
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <iostream>
#include <functional>
#include <thread>
#include <memory>
#include "logger.hpp"
namespace IMS {
class MQClient {
public:
using MessageCallback = std::function<void(const char*, size_t)>;
using ptr = std::shared_ptr<MQClient>;
MQClient(const std::string &user, const std::string passwd, const std::string host) {
_loop = EV_DEFAULT;
_handler = std::make_unique<AMQP::LibEvHandler>(_loop);
std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/";
AMQP::Address address(url);
_connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);
_channel = std::make_unique<AMQP::TcpChannel>(_connection.get());
_loop_thread = std::thread([this]() {
ev_run(_loop, 0);
});
}
~MQClient() {
ev_async_init(&_async_watcher, watcher_callback);
ev_async_start(_loop, &_async_watcher);
ev_async_send(_loop, &_async_watcher);
_loop_thread.join();
_loop = nullptr;
}
void declareComponents(const std::string &exchange, const std::string &queue, const std::string &routing_key = "routing_key", AMQP::ExchangeType echange_type = AMQP::ExchangeType::direct) {
_channel->declareExchange(exchange, echange_type)
.onError([](const char *message) { LOG_ERROR("声明交换机失败:{}", message); exit(0); })
.onSuccess([exchange]() { LOG_DEBUG("{} 交换机创建成功!", exchange); });
_channel->declareQueue(queue)
.onError([](const char *message) { LOG_ERROR("声明队列失败:{}", message); exit(0); })
.onSuccess([queue]() { LOG_DEBUG("{} 队列创建成功!", queue); });
_channel->bindQueue(exchange, queue, routing_key)
.onError([exchange, queue](const char *message) { LOG_ERROR("{} - {} 绑定失败:", exchange, queue); exit(0); })
.onSuccess([exchange, queue, routing_key]() { LOG_DEBUG("{} - {} - {} 绑定成功!", exchange, queue, routing_key); });
}
bool publish(const std::string &exchange, const std::string &msg, const std::string &routing_key = "routing_key") {
LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key);
bool ret = _channel->publish(exchange, routing_key, msg);
if (ret == false) {
LOG_ERROR("{} 发布消息失败:", exchange);
return false;
}
return true;
}
void consume(const std::string &queue, const MessageCallback &cb) {
LOG_DEBUG("开始订阅 {} 队列消息!", queue);
_channel->consume(queue, "consume-tag")
.onReceived([this, cb](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
cb(message.body(), message.bodySize());
_channel->ack(deliveryTag);
})
.onError([queue](const char *message) { LOG_ERROR("订阅 {} 队列消息失败:{}", queue, message); exit(0); });
}
private:
static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
ev_break(loop, EVBREAK_ALL);
}
struct ev_async _async_watcher;
struct ev_loop *_loop;
std::unique_ptr<AMQP::LibEvHandler> _handler;
std::unique_ptr<AMQP::TcpConnection> _connection;
std::unique_ptr<AMQP::TcpChannel> _channel;
std::thread _loop_thread;
};
}
测试
- 属于每条消息,由生产者发送消息时指定。
- 一条消息只有一个 Routing Key。
- 属于每个绑定,而每个绑定连接一个交换机和一个队列。
- 一个队列可以有多个绑定,因此可以有多个不同的 Binding Key。
- 一个绑定就是一条规则:"如果消息的 Routing Key 匹配这个 Binding Key,就把消息送到这个队列"。
步骤 1:生产者发送
生产者应用创建一个消息,为其指定一个路由键(Routing Key),然后将其发送到 Broker 上一个已知的 Exchange。
步骤 2:交换机路由
Exchange 收到消息后,会提取消息中的 Routing Key。然后,它查看所有绑定(Binding)到自身的规则列表。
步骤 3:绑定匹配与投递
对于每一条绑定规则,Exchange 会根据自身的类型,将消息的 Routing Key 与绑定的 Binding Key 进行匹配。
- 如果匹配成功,Exchange 就会将消息的一个副本投递到该绑定规则所指向的队列。
- 如果匹配失败,则跳过该队列。
- Exchange 会遍历所有绑定到它的规则,可能将消息投递到零个、一个或多个队列。
步骤 4:队列存储与消费
消息被投递到队列后,便存储在队列中。消费者应用从自己订阅的队列中获取消息进行处理。
交换机的类型决定了 Binding Key 和 Routing Key 的匹配算法。
- 匹配规则:精确、完全相等的字符串匹配。
- Binding Key:必须是一个明确的字符串,如 "email"、"order.paid"。
- Routing Key:必须与 Binding Key 完全一致,消息才会被路由。
- 队列 Q1 绑定了 Binding Key: "error"。
- 生产者发送消息 A (Routing Key: "info") -> 不匹配,消息 A 不会进入 Q1。
- 生产者发送消息 B (Routing Key: "error") -> 精确匹配,消息 B 进入 Q1。
匹配规则:通配符模式匹配。这是最灵活、最常用的路由方式。
Binding Key:是一个用点号.分隔的单词组成的模式,支持两个通配符:
- * (星号):匹配恰好一个单词。
-
(井号):匹配零个或多个单词。
Routing Key:也是一个用点号.分隔的单词组成的字符串(不能包含通配符)。
- 队列 Q1 绑定了 Binding Key: "*.stock.usd" -> 关心所有以.stock.usd 结尾,且中间有一个任意单词的消息。
- 队列 Q2 绑定了 Binding Key: "nyse.#" -> 关心所有以 nyse.开头的消息。
- 生产者发送消息 (Routing Key: "nyse.stock.usd") -> 同时匹配 Q1 和 Q2,消息会进入两个队列(广播)。
- 生产者发送消息 (Routing Key: "forex.eur.usd") -> 只匹配 Q1,消息进入 Q1。
- 生产者发送消息 (Routing Key: "nyse") -> 只匹配 Q2(#可以匹配零个单词),消息进入 Q2。
用途:基于多重标准的灵活消息路由,如日志系统、事件通知系统。
- 匹配规则:忽略 Routing Key 和 Binding Key。
- 行为:它会将收到的所有消息无条件地广播到所有与之绑定的队列。
- Binding Key:在创建绑定时通常设置为空字符串""(但设置什么值都无所谓,因为不会被使用)。
- 用途:纯粹的广播/发布 - 订阅场景。
那么在我们这里我们只会去测试这个 direct 交换机。
#include <iostream>
#include <atomic>
#include <thread>
#include <chrono>
#include "../../common/rabbitmq.hpp"
int main() {
IMS::init_logger(false, "", 0);
IMS::MQClient client("root", "123456", "127.0.0.1:5672");
client.declareComponents("test-exchange", "test-queue", "test-key");
std::atomic<bool> messageReceived(false);
client.consume("test-queue", [&messageReceived](const char* data, size_t size) {
std::string msg(data, size);
std::cout << "Received message: " << msg << std::endl;
messageReceived = true;
});
std::this_thread::sleep_for(std::chrono::milliseconds(500));
bool pubSuccess = client.publish("test-exchange", "Hello, RabbitMQ!", "test-key");
if (!pubSuccess) {
std::cerr << "Failed to publish message!" << std::endl;
return 1;
}
std::cout << "Message published." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
if (messageReceived) {
std::cout << "Test passed: message received successfully." << std::endl;
return 0;
} else {
std::cerr << "Test failed: timeout waiting for message." << std::endl;
return 1;
}
}
[default-logger][20:35:48][396036][debug ][../../common/rabbitmq.hpp:150] 开始订阅 test-queue 队列消息!
[default-logger][20:35:48][396038][debug ][../../common/rabbitmq.hpp:93] test-exchange 交换机创建成功!
[default-logger][20:35:48][396038][debug ][../../common/rabbitmq.hpp:105] test-queue 队列创建成功!
[default-logger][20:35:48][396038][debug ][../../common/rabbitmq.hpp:117] test-exchange - test-queue - test-key 绑定成功!
[default-logger][20:35:49][396036][debug ][../../common/rabbitmq.hpp:133] 向交换机 test-exchange-test-key 发布消息!
Message published.
Received message: Hello, RabbitMQ!
#include "../../../../common/rabbitmq.hpp"
#include "../../../../common/logger.hpp"
#include <gflags/gflags.h>
DEFINE_string(user, "root", "rabbitmq 访问用户名");
DEFINE_string(pswd, "123456", "rabbitmq 访问密码");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq 服务器地址信息 host:port");
DEFINE_bool(run_mode, false, "程序的运行模式,false-调试;true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");
int main(int argc, char *argv[]) {
google::ParseCommandLineFlags(&argc, &argv, true);
IMS::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
IMS::MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host);
client.declareComponents("test-exchange", "test-queue");
for (int i = 0; i < 10; i++) {
std::string msg = "Hello Bite-" + std::to_string(i);
bool ret = client.publish("test-exchange", msg);
if (ret == false) {
std::cout << "publish 失败!\n";
}
}
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}
#include "../../../../common/rabbitmq.hpp"
#include "../../../../common/logger.hpp"
#include <gflags/gflags.h>
DEFINE_string(user, "root", "rabbitmq 访问用户名");
DEFINE_string(pswd, "123456", "rabbitmq 访问密码");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq 服务器地址信息 host:port");
DEFINE_bool(run_mode, false, "程序的运行模式,false-调试;true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");
void callback(const char *body, size_t sz) {
std::string msg;
msg.assign(body, sz);
std::cout << msg << std::endl;
}
int main(int argc, char *argv[]) {
google::ParseCommandLineFlags(&argc, &argv, true);
IMS::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
IMS::MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host);
client.declareComponents("test-exchange", "test-queue");
client.consume("test-queue", callback);
std::this_thread::sleep_for(std::chrono::seconds(60));
return 0;
}
all : publish consume
publish : publish.cc
g++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags
consume : consume.cc
g++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags
.PHONY : clean
clean : rm publish consume
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
- JSON美化和格式化
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online