异步消息队列的设计与 Rust 实战集成
一、引言
💡 消息队列是现代异步编程和微服务架构中的核心组件,通过解耦生产者与消费者、、和,解决了高并发、分布式系统中的诸多挑战。Rust 语言的异步特性(基于 Tokio 运行时)和内存安全保障,使得它非常适合构建高性能、低延迟、可靠的异步消息队列应用。
介绍异步消息队列设计原理及 Rust 实战。涵盖 tokio::sync::mpsc、RabbitMQ(lapin)、NATS(nats-rs) 的使用,包含生产消费、错误处理、持久化等示例。通过用户同步、订单处理、监控服务集成演示微服务间异步通信。最后讨论性能优化(队列长度、批处理、连接池、压缩)及常见问题(丢失、重复、乱序、阻塞)解决方案。

💡 消息队列是现代异步编程和微服务架构中的核心组件,通过解耦生产者与消费者、、和,解决了高并发、分布式系统中的诸多挑战。Rust 语言的异步特性(基于 Tokio 运行时)和内存安全保障,使得它非常适合构建高性能、低延迟、可靠的异步消息队列应用。
本文将深入探讨异步消息队列的设计原理,介绍 Rust 异步生态中的常用消息队列库(如内置的 tokio::sync::mpsc、RabbitMQ 的 lapin、NATS 的 nats-rs),并通过实战项目集成演示如何在用户同步服务、订单处理服务和监控服务中使用消息队列实现系统间的异步通信。
异步消息队列的核心角色和组件包括:
异步消息队列在 Rust 中的实现具有以下特点:
tokio::sync::mpsc 是轻量级的进程内通信工具,开销极小。lapin 和 nats-rs 支持分布式消息队列,可轻松扩展到多个节点。tokio::sync::mpsctokio::sync::mpsc 是 Tokio 运行时内置的多生产者单消费者异步消息队列,适合进程内通信或轻量级的分布式通信(结合网络库)。
use tokio::sync::mpsc;
use tokio::time::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
// 创建消息队列,指定缓冲区大小为 10
let (tx, mut rx) = mpsc::channel(10);
// 生产者任务
let producer = tokio::spawn(async move {
for i in 1..=5 {
println!("Producer: Sending message {}", i);
tx.send(i).await.unwrap();
sleep(Duration::from_millis(500)).await;
}
println!("Producer: Finished sending messages");
});
// 消费者任务
let consumer = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
println!("Consumer: Received message {}", msg);
sleep(Duration::from_millis(1000)).await;
}
println!("Consumer: Finished receiving messages");
});
// 等待生产者和消费者任务完成
producer.await.unwrap();
consumer.await.unwrap();
}
use tokio::sync::mpsc;
use tokio::time::{sleep, timeout};
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
enum QueueError {
#[error("Receive timeout")]
ReceiveTimeout,
#[error("Send error: {0}")]
SendError(#[from] mpsc::error::SendError<usize>),
#[error("Receive error: {0}")]
ReceiveError(#[from] mpsc::error::RecvError),
}
async fn send_messages(tx: mpsc::Sender<usize>) -> Result<(), QueueError> {
for i in 1..=3 {
println!("Producer: Sending message {}", i);
tx.send(i).await?;
sleep(Duration::from_millis(600)).await;
}
Ok(())
}
async fn receive_messages(mut rx: mpsc::Receiver<usize>) -> Result<(), QueueError> {
let timeout_duration = Duration::from_millis(1000);
loop {
match timeout(timeout_duration, rx.recv()).await {
Ok(Some(msg)) => println!("Consumer: Received message {}", msg),
Ok(None) => {
println!("Consumer: Queue closed");
break;
}
Err(_) => {
println!("Consumer: Receive timeout");
return Err(QueueError::ReceiveTimeout);
}
}
}
Ok(())
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(5);
let producer = tokio::spawn(send_messages(tx));
let consumer = tokio::spawn(receive_messages(rx));
let producer_result = producer.await.unwrap();
let consumer_result = consumer.await.unwrap();
match (producer_result, consumer_result) {
(Ok(()), Ok(())) => println!("All tasks completed successfully"),
(Err(e), _) => println!("Producer error: {:?}", e),
(_, Err(e)) => println!("Consumer error: {:?}", e),
}
}
use tokio::sync::mpsc;
use tokio::time::sleep;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
enum ProcessError {
#[error("Invalid message: {0}")]
InvalidMessage(usize),
}
async fn process_message(msg: usize) -> Result<(), ProcessError> {
if msg % 2 == 0 {
return Err(ProcessError::InvalidMessage(msg));
}
println!("Consumer: Processed message {}", msg);
Ok(())
}
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(5);
let producer = tokio::spawn(async move {
for i in 1..=5 {
tx.send(i).await.unwrap();
sleep(Duration::from_millis(500)).await;
}
});
let consumer = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if let Err(e) = process_message(msg).await {
println!("Consumer: Error processing message: {:?}", e);
}
}
});
producer.await.unwrap();
consumer.await.unwrap();
}
lapinlapin 是 Rust 中最流行的 RabbitMQ 异步客户端库,支持 AMQP 0.9.1 协议,适合分布式系统中的异步通信。
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use tokio_amqp::LapinTokioExt;
#[tokio::main]
async fn main() {
let uri = "amqp://guest:guest@localhost:5672/%2F";
let conn = Connection::connect(
uri,
ConnectionProperties::default().with_tokio(),
)
.await
.unwrap();
println!("Connected to RabbitMQ");
let channel = conn.create_channel().await.unwrap();
let queue = channel
.queue_declare(
"test_queue",
QueueDeclareOptions {
durable: true,
auto_delete: false,
exclusive: false,
passive: false,
..Default::default()
},
FieldTable::default(),
)
.await
.unwrap();
println!("Declared queue: {:?}", queue);
}
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use tokio_amqp::LapinTokioExt;
use serde_json::json;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct User {
id: i32,
name: String,
email: String,
}
#[tokio::main]
async fn main() {
let uri = "amqp://guest:guest@localhost:5672/%2F";
let conn = Connection::connect(
uri,
ConnectionProperties::default().with_tokio(),
)
.await
.unwrap();
let channel = conn.create_channel().await.unwrap();
let queue = channel
.queue_declare(
"user_queue",
QueueDeclareOptions {
durable: true,
auto_delete: false,
exclusive: false,
passive: false,
..Default::default()
},
FieldTable::default(),
)
.await
.unwrap();
let users = vec![
User {
id: 1,
name: "张三".to_string(),
email: "[email protected]".to_string(),
},
User {
id: 2,
name: "李四".to_string(),
email: "[email protected]".to_string(),
},
];
for user in users {
let msg = serde_json::to_vec(&user).unwrap();
channel
.basic_publish(
"",
"user_queue",
BasicPublishOptions::default(),
msg.as_slice(),
lapin::BasicProperties::default().with_delivery_mode(2), // 持久化消息
)
.await
.unwrap()
.await
.unwrap();
println!("Sent user: {:?}", user);
sleep(Duration::from_millis(500)).await;
}
println!("All users sent successfully");
}
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use tokio_amqp::LapinTokioExt;
use serde_json::json;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct User {
id: i32,
name: String,
email: String,
}
#[tokio::main]
async fn main() {
let uri = "amqp://guest:guest@localhost:5672/%2F";
let conn = Connection::connect(
uri,
ConnectionProperties::default().with_tokio(),
)
.await
.unwrap();
let channel = conn.create_channel().await.unwrap();
let queue = channel
.queue_declare(
"user_queue",
QueueDeclareOptions {
durable: true,
auto_delete: false,
exclusive: false,
passive: false,
..Default::default()
},
FieldTable::default(),
)
.await
.unwrap();
println!("Declared queue: {:?}", queue);
let mut consumer = channel
.basic_consume(
"user_queue",
"user_consumer",
BasicConsumeOptions {
no_ack: false,
exclusive: false,
..Default::default()
},
FieldTable::default(),
)
.await
.unwrap();
println!("Consumer registered");
while let Some(delivery) = consumer.next().await {
let delivery = delivery.unwrap();
let user: User = serde_json::from_slice(&delivery.data).unwrap();
println!("Received user: {:?}", user);
// 手动确认消息
delivery.ack(BasicAckOptions::default()).await.unwrap();
}
}
nats-rsNATS 是云原生的轻量级消息系统,具有高性能、低延迟和简单易用的特点,nats-rs 是 Rust 中的官方异步客户端库。
use nats::asynk::Connection;
use serde_json::json;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct Order {
id: i32,
user_id: i32,
product_id: i32,
quantity: i32,
}
#[tokio::main]
async fn main() {
let nc = Connection::new("nats://localhost:4222").await.unwrap();
let orders = vec![
Order {
id: 1,
user_id: 1,
product_id: 101,
quantity: 2,
},
Order {
id: 2,
user_id: 2,
product_id: 102,
quantity: 1,
},
];
for order in orders {
let msg = serde_json::to_vec(&order).unwrap();
nc.publish("orders.new", &msg).await.unwrap();
println!("Published order: {:?}", order);
sleep(Duration::from_millis(500)).await;
}
println!("All orders published successfully");
}
use nats::asynk::Connection;
use serde_json::json;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct Order {
id: i32,
user_id: i32,
product_id: i32,
quantity: i32,
}
async fn process_order(order: Order) {
println!("Processing order: {:?}", order);
// 模拟订单处理逻辑
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
println!("Order processed: {:?}", order);
}
#[tokio::main]
async fn main() {
let nc = Connection::new("nats://localhost:4222").await.unwrap();
// 队列订阅(负载均衡)
let subscription = nc.queue_subscribe("orders.new", "order_processing_group").await.unwrap();
println!("Queue subscription registered");
loop {
let message = subscription.next().await.unwrap();
let order: Order = serde_json::from_slice(&message.data).unwrap();
tokio::spawn(process_order(order));
}
}
我们将异步消息队列集成到三个微服务中,实现以下消息流程:
首先定义消息结构,使用 serde 库进行序列化和反序列化:
// shared/src/lib.rs
use serde::Serialize;
use serde::Deserialize;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct UserChangeEvent {
pub user_id: i32,
pub event_type: UserEventType,
pub old_data: Option<UserData>,
pub new_data: Option<UserData>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum UserEventType {
Created,
Updated,
Deleted,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct UserData {
pub id: i32,
pub name: String,
pub email: String,
pub phone: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct OrderEvent {
pub order_id: i32,
pub user_id: i32,
pub product_id: i32,
pub quantity: i32,
pub event_type: OrderEventType,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum OrderEventType {
Created,
Updated,
Deleted,
Paid,
Shipped,
Delivered,
}
// user-sync-service/src/mq/producer.rs
use crate::config::Config;
use crate::sync::ThirdPartyUser;
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, BasicProperties};
use shared::UserChangeEvent;
use shared::UserEventType;
use shared::UserData;
use serde_json::json;
use tokio_amqp::LapinTokioExt;
#[derive(Clone)]
pub struct UserChangeProducer {
channel: lapin::Channel,
queue_name: String,
}
impl UserChangeProducer {
pub async fn new(config: &Config) -> Result<Self, Box<dyn std::error::Error>> {
let conn = Connection::connect(&config.rabbitmq.uri, ConnectionProperties::default().with_tokio()).await?;
let channel = conn.create_channel().await?;
let queue = channel
.queue_declare(
&config.rabbitmq.user_change_queue,
QueueDeclareOptions {
durable: true,
auto_delete: false,
exclusive: false,
passive: false,
..Default::default()
},
FieldTable::default(),
)
.await?;
println!("Declared user change queue: {:?}", queue);
Ok(UserChangeProducer {
channel,
queue_name: config.rabbitmq.user_change_queue.clone(),
})
}
pub async fn send_event(&self, event: UserChangeEvent) -> Result<(), Box<dyn std::error::Error>> {
let msg = serde_json::to_vec(&event).unwrap();
self.channel
.basic_publish(
"",
&self.queue_name,
BasicPublishOptions::default(),
msg.as_slice(),
BasicProperties::default().with_delivery_mode(2),
)
.await?
.await?;
println!("Sent user change event: {:?}", event);
Ok(())
}
pub async fn send_created_event(&self, user: &ThirdPartyUser) -> Result<(), Box<dyn std::error::Error>> {
let event = UserChangeEvent {
user_id: user.id,
event_type: UserEventType::Created,
old_data: None,
new_data: Some(UserData {
id: user.id,
name: user.name.clone(),
email: user.email.clone(),
phone: user.phone.clone(),
}),
};
self.send_event(event).await
}
pub async fn send_updated_event(&self, old_user: &ThirdPartyUser, new_user: &ThirdPartyUser) -> Result<(), Box<dyn std::error::Error>> {
let event = UserChangeEvent {
user_id: new_user.id,
event_type: UserEventType::Updated,
old_data: Some(UserData {
id: old_user.id,
name: old_user.name.clone(),
email: old_user.email.clone(),
phone: old_user.phone.clone(),
}),
new_data: Some(UserData {
id: new_user.id,
name: new_user.name.clone(),
email: new_user.email.clone(),
phone: new_user.phone.clone(),
}),
};
self.send_event(event).await
}
pub async fn send_deleted_event(&self, user: &ThirdPartyUser) -> Result<(), Box<dyn std::error::Error>> {
let event = UserChangeEvent {
user_id: user.id,
event_type: UserEventType::Deleted,
old_data: Some(UserData {
id: user.id,
name: user.name.clone(),
email: user.email.clone(),
phone: user.phone.clone(),
}),
new_data: None,
};
self.send_event(event).await
}
}
// order-processing-service/src/mq/consumer.rs
use crate::config::Config;
use crate::db::OrderRepository;
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use shared::UserChangeEvent;
use shared::UserEventType;
use serde_json::json;
use tokio_amqp::LapinTokioExt;
#[derive(Clone)]
pub struct UserChangeConsumer {
channel: lapin::Channel,
queue_name: String,
order_repo: OrderRepository,
}
impl UserChangeConsumer {
pub async fn new(config: &Config, order_repo: OrderRepository) -> Result<Self, Box<dyn std::error::Error>> {
let conn = Connection::connect(&config.rabbitmq.uri, ConnectionProperties::default().with_tokio()).await?;
let channel = conn.create_channel().await?;
let queue = channel
.queue_declare(
&config.rabbitmq.user_change_queue,
QueueDeclareOptions {
durable: true,
auto_delete: false,
exclusive: false,
passive: false,
..Default::default()
},
FieldTable::default(),
)
.await?;
println!("Declared user change queue: {:?}", queue);
Ok(UserChangeConsumer {
channel,
queue_name: config.rabbitmq.user_change_queue.clone(),
order_repo,
})
}
pub async fn start_consuming(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut consumer = self.channel
.basic_consume(
&self.queue_name,
"order_processing_consumer",
BasicConsumeOptions {
no_ack: false,
exclusive: false,
..Default::default()
},
FieldTable::default(),
)
.await?;
println!("User change consumer registered");
while let Some(delivery) = consumer.next().await {
let delivery = delivery.unwrap();
let event: UserChangeEvent = serde_json::from_slice(&delivery.data).unwrap();
println!("Received user change event: {:?}", event);
match event.event_type {
UserEventType::Created => {
// 用户创建时,不需要更新订单
}
UserEventType::Updated => {
// 用户更新时,更新订单关联的用户信息
if let Some(new_data) = event.new_data {
self.order_repo.update_user_info(event.user_id, &new_data.name, &new_data.email).await?;
}
}
UserEventType::Deleted => {
// 用户删除时,逻辑删除订单(如果需要)
self.order_repo.soft_delete_by_user_id(event.user_id).await?;
}
}
delivery.ack(BasicAckOptions::default()).await?;
}
Ok(())
}
}
// monitoring-service/src/mq/consumer.rs
use crate::config::Config;
use crate::monitor::SystemState;
use crate::monitor::PerformanceMetric;
use nats::asynk::Connection;
use shared::UserChangeEvent;
use shared::OrderEvent;
use serde_json::json;
#[derive(Clone)]
pub struct EventConsumer {
nc: nats::asynk::Connection,
order_repo: crate::db::OrderRepository,
user_repo: crate::db::UserRepository,
}
impl EventConsumer {
pub async fn new(config: &Config, order_repo: crate::db::OrderRepository, user_repo: crate::db::UserRepository) -> Result<Self, Box<dyn std::error::Error>> {
let nc = Connection::new(&config.nats.uri).await?;
Ok(EventConsumer {
nc,
order_repo,
user_repo,
})
}
pub async fn start_consuming(&self) -> Result<(), Box<dyn std::error::Error>> {
let user_subscription = self.nc.subscribe("users.change").await?;
let order_subscription = self.nc.subscribe("orders.new").await?;
println!("Event consumer registered");
tokio::spawn(async move {
loop {
let message = user_subscription.next().await.unwrap();
let event: UserChangeEvent = serde_json::from_slice(&message.data).unwrap();
println!("Received user change event: {:?}", event);
// 记录用户变更日志
crate::monitor::log_event("user_change", &event).await;
}
});
tokio::spawn(async move {
loop {
let message = order_subscription.next().await.unwrap();
let event: OrderEvent = serde_json::from_slice(&message.data).unwrap();
println!("Received order event: {:?}", event);
// 记录订单事件日志
crate::monitor::log_event("order_event", &event).await;
}
});
Ok(())
}
}
队列长度控制可以防止消息积压导致的内存泄漏或系统崩溃。对于 tokio::sync::mpsc,可以在创建队列时指定缓冲区大小;对于 RabbitMQ,可以在声明队列时设置 x-max-length 参数。
RabbitMQ 队列长度控制:
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use tokio_amqp::LapinTokioExt;
#[tokio::main]
async fn main() {
let uri = "amqp://guest:guest@localhost:5672/%2F";
let conn = Connection::connect(
uri,
ConnectionProperties::default().with_tokio(),
)
.await
.unwrap();
let channel = conn.create_channel().await.unwrap();
let mut args = FieldTable::default();
args.insert("x-max-length".into(), 10000.into()); // 最大队列长度为 10000
let queue = channel
.queue_declare(
"limited_queue",
QueueDeclareOptions {
durable: true,
auto_delete: false,
exclusive: false,
passive: false,
..Default::default()
},
args,
)
.await
.unwrap();
println!("Declared limited queue: {:?}", queue);
}
消息批处理可以减少网络传输和队列操作的开销,提高系统的吞吐量。对于 tokio::sync::mpsc,可以批量发送消息;对于 RabbitMQ 和 NATS,可以使用批量发布或订阅。
NATS 消息批处理:
use nats::asynk::Connection;
use serde_json::json;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct Order {
id: i32,
user_id: i32,
product_id: i32,
quantity: i32,
}
#[tokio::main]
async fn main() {
let nc = Connection::new("nats://localhost:4222").await.unwrap();
let orders = vec![
Order {
id: 1,
user_id: 1,
product_id: 101,
quantity: 2,
},
Order {
id: 2,
user_id: 2,
product_id: 102,
quantity: 1,
},
Order {
id: 3,
user_id: 3,
product_id: 103,
quantity: 3,
},
];
let mut batch = Vec::new();
for order in orders {
let msg = serde_json::to_vec(&order).unwrap();
batch.push(msg);
}
// 批量发布消息
nc.publish_batch("orders.batch", batch).await.unwrap();
println!("Batch published successfully");
}
连接池可以减少建立和关闭连接的开销,提高系统的并发能力。对于 RabbitMQ,可以使用 lapin 的连接池;对于 NATS,可以使用 nats 的连接池。
RabbitMQ 连接池:
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use r2d2;
use r2d2_lapin::LapinConnectionManager;
use tokio_amqp::LapinTokioExt;
#[tokio::main]
async fn main() {
let manager = LapinConnectionManager::new(
"amqp://guest:guest@localhost:5672/%2F",
ConnectionProperties::default().with_tokio(),
);
let pool = r2d2::Pool::builder()
.max_size(10) // 最大连接数为 10
.build(manager)
.unwrap();
println!("RabbitMQ connection pool created");
// 从连接池获取连接
let conn = pool.get().unwrap();
let channel = conn.create_channel().await.unwrap();
let queue = channel
.queue_declare("pool_queue", QueueDeclareOptions::default(), FieldTable::default())
.await
.unwrap();
println!("Declared pool queue: {:?}", queue);
}
消息压缩可以减少网络传输和队列存储的开销,提高系统的吞吐量。对于文本消息,可以使用 gzip 或 snappy 压缩;对于二进制消息,可以使用 lz4 压缩。
使用 gzip 压缩消息:
use flate2::write::GzEncoder;
use flate2::Compression;
use nats::asynk::Connection;
use serde_json::json;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct LargeData {
id: i32,
data: Vec<u8>,
}
fn compress(data: &[u8]) -> Vec<u8> {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(data).unwrap();
encoder.finish().unwrap()
}
fn decompress(data: &[u8]) -> Vec<u8> {
let mut decoder = flate2::read::GzDecoder::new(data);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).unwrap();
decompressed
}
#[tokio::main]
async fn main() {
let nc = Connection::new("nats://localhost:4222").await.unwrap();
let large_data = LargeData {
id: 1,
data: vec![0; 1024 * 1024], // 1MB 数据
};
let uncompressed = serde_json::to_vec(&large_data).unwrap();
let compressed = compress(&uncompressed);
println!("Uncompressed size: {} bytes", uncompressed.len());
println!("Compressed size: {} bytes", compressed.len());
nc.publish("large_data.compressed", &compressed).await.unwrap();
println!("Compressed data published");
let mut subscription = nc.subscribe("large_data.compressed").await.unwrap();
let message = subscription.next().await.unwrap();
let decompressed = decompress(&message.data);
let received = serde_json::from_slice(&decompressed).unwrap();
println!("Received decompressed data: {:?}", received);
}
问题描述:消息在发送或处理过程中丢失,导致系统数据不一致。
原因分析:
解决方案:
delivery_mode 为 2(RabbitMQ)或使用持久化主题(NATS JetStream)。问题描述:消费者收到重复的消息,导致系统逻辑错误。
原因分析:
解决方案:
问题描述:消费者收到的消息顺序与生产者发送的顺序不一致。
原因分析:
解决方案:
问题描述:消费者处理消息的速度慢于生产者发送消息的速度,导致消息积压。
原因分析:
解决方案:
异步消息队列是现代异步编程和微服务架构中的核心组件,通过解耦、异步通信、削峰填谷和可靠性保证,解决了高并发、分布式系统中的诸多挑战。Rust 语言的异步特性和内存安全保障,使得它非常适合构建高性能、低延迟、可靠的异步消息队列应用。
本文深入探讨了异步消息队列的设计原理,介绍了 Rust 异步生态中的常用消息队列库(如内置的 tokio::sync::mpsc、RabbitMQ 的 lapin、NATS 的 nats-rs),并通过实战项目集成演示了如何在用户同步服务、订单处理服务和监控服务中使用消息队列实现系统间的异步通信。
通过学习本章内容,我们可以更好地理解异步消息队列的工作原理,掌握其实现方法,并在实际项目中构建高效、可靠的异步消息队列应用。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online