1. Message Queue 概述
计算机之间的通信方式主要有两种:同步通信和异步通信。
同步通信 (Synchronous Communication):通信双方在严格的时间约束下进行交互。发送方发送请求或数据后,会主动等待并阻塞自身,直到收到接收方的明确响应 (成功、失败或超时) 才会继续执行后续操作。整个过程像是在进行一场'实时对话'。
异步通信 (Asynchronous Communication):发送方发出请求或消息后,不等待接收方的即时响应,而是立即返回并继续执行后续任务。接收方在准备好结果后,通过某种机制将响应或结果'推送'或'通知'给发送方。整个过程更像是'发送邮件'。
MQ (Message Queue,消息队列) 是一种用于实现异步通信的中间件技术。它允许不同组件 (例如应用程序、服务或微服务) 通过发送和接收消息来进行通信,而无需实时等待响应。核心思想是解耦生产者 (发送消息) 和消费者 (接收消息) 之间的关系。其核心功能如下:
削峰填谷 (缓冲):当消费者处理能力有限或暂时不可用时,队列可以作为缓冲区,暂存生产者发送的大量消息,避免系统过载或消息丢失。待消费者恢复或扩展后,再逐步消费积压的消息。

异步解耦:生产者和消费者不需要知道对方的存在或状态。生产者只需将消息发送到队列,消费者在准备好时从队列中获取消息进行处理。

2. 初识 RabbitMQ
AMQP 是一个开放标准的应用层协议,设计用于异步、可靠、跨平台的消息传递。它定义了消息的格式、传递机制和路由规则,是构建分布式系统中消息中间件的核心协议。
RabbitMQ是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 的消息中间件。
2.1 安装
启动 RabbitMQ
root@VM-0-7-ubuntu:~# service rabbitmq-server start
安装 RabbitMQ 管理界面
root@VM-0-7-ubuntu:~# rabbitmq-plugins enable rabbitmq_management
安装 RabbitMQ
root@VM-0-7-ubuntu:~# apt-get install rabbitmq-server
root@VM-0-7-ubuntu:~# systemctl status rabbitmq-server

安装 Erlang
root@VM-0-7-ubuntu:~# apt-get update
root@VM-0-7-ubuntu:~# apt-get install erlang
root@VM-0-7-ubuntu:~# erl
1> halt().

2.2 访问
- 5672 端口是 RabbitMQ 的默认 AMQP 协议端口。生产者和消费者通过此端口与 RabbitMQ 交互。
- 15672 端口是 RabbitMQ 管理插件的默认 HTTP 端口。通过此端口可以访问 RabbitMQ 的 Web 管理界面,提供图形化的管理功能。
添加管理员用户
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
2.3 工作流程
| RabbitMQ 组件 | 邮局系统类比 | 作用 |
|---|
| Broker(代理) | 整个邮局 | 整个消息队列系统,包含所有组件 |
| Virtual Host(虚拟主机) | 不同地域的邮局分局 | 逻辑隔离,每个 vhost 有独立的用户/权限/队列 |
| Connection(连接) | 电话专线 | 生产者和消费者之间的 TCP 连接 |
| Channel(信道) | 分机电话 | 虚拟连接,复用同一个 TCP 连接 |
| Exchange(交换机) | 邮件分拣中心 | 接收消息并决定路由到哪个队列 |
| Queue(队列) | 收件人的邮箱 | 存储消息等待消费者取走 |
| Binding(绑定) | 分拣规则 | 交换机和队列之间的路由规则 |

第一阶段:连接建立
建立 TCP 连接:生产者应用程序首先与 Broker 建立 TCP 连接,默认端口为 5672。连接需要指定虚拟主机、用户名和密码。
创建信道:在已建立的 TCP 连接上创建一个或多个信道。信道是虚拟连接,共享同一个 TCP 连接。每个信道有独立的通信流,相互隔离。
第二阶段:资源声明
声明交换机:指定名称、类型和属性。
声明队列:指定名称和属性。
创建绑定:将队列绑定到交换机,并指定绑定键 (Binding Key)。
第三阶段:消息发送
发送消息到交换机:将消息发布到指定的交换机。消息包含:交换机名称、路由键、消息体、消息属性。
交换机路由消息:交换机接收消息后,根据其类型和绑定规则进行路由。
第四阶段:路由处理
成功路由:队列接收并存储消息,等待消费者拉取或推送消息给消费者。
3. RabbitMQ 六种工作模式
3.1 Simple
核心特征:
- 最简单的消息队列模式
- 一个生产者,一个消费者,一个队列
工作流程:

代码实现:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
for (int i ; i < ; i++) {
channel.basicPublish(, Constants.WORK_QUEUE, , ( + i).getBytes());
}
System.out.println();
channel.close();
connection.close();
}
}
com.rabbitmq.client.*;
rabbitmq.constant.Constants;
java.io.IOException;
java.util.concurrent.TimeoutException;
{
IOException, TimeoutException, InterruptedException {
();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
connectionFactory.newConnection();
connection.createChannel();
channel.queueDeclare(, , , , );
(channel) {
{
System.out.println( + (body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, , defaultConsumer);
Thread.sleep();
channel.close();
connection.close();
}
}
3.2 Work Queues
核心特征:
- 多个消费者竞争消费同一个队列
- 轮询分发 (默认):每个消费者依次接收消息
工作流程:

代码实现:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("", Constants.WORK_QUEUE, , ( + i).getBytes());
}
System.out.println();
channel.close();
connection.close();
}
}
com.rabbitmq.client.*;
rabbitmq.constant.Constants;
java.io.IOException;
java.util.concurrent.TimeoutException;
{
IOException, TimeoutException, InterruptedException {
();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
connectionFactory.newConnection();
connection.createChannel();
channel.queueDeclare(Constants.WORK_QUEUE, , , , );
(channel) {
{
System.out.println( + (body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, , defaultConsumer);
Thread.sleep();
}
}
com.rabbitmq.client.*;
rabbitmq.constant.Constants;
java.io.IOException;
java.util.concurrent.TimeoutException;
{
IOException, TimeoutException, InterruptedException {
();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
connectionFactory.newConnection();
connection.createChannel();
channel.queueDeclare(Constants.WORK_QUEUE, , , , );
(channel) {
{
System.out.println( + (body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, , defaultConsumer);
Thread.sleep();
}
}
3.3 Publish/Subscribe
核心特征:
工作流程:

代码实现:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, , );
channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, );
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, );
( ; i < ; i++) {
channel.basicPublish(Constants.FANOUT_EXCHANGE, , , ( + i).getBytes());
}
System.out.println();
channel.close();
connection.close();
}
}
com.rabbitmq.client.*;
rabbitmq.constant.Constants;
java.io.IOException;
java.util.concurrent.TimeoutException;
{
IOException, TimeoutException, InterruptedException {
();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
connectionFactory.newConnection();
connection.createChannel();
channel.queueDeclare(Constants.FANOUT_QUEUE1, , , , );
(channel) {
{
System.out.println( + (body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE1, , defaultConsumer);
Thread.sleep();
}
}
com.rabbitmq.client.*;
rabbitmq.constant.Constants;
java.io.IOException;
java.util.concurrent.TimeoutException;
{
IOException, TimeoutException, InterruptedException {
();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
connectionFactory.newConnection();
connection.createChannel();
channel.queueDeclare(Constants.FANOUT_QUEUE2, , , , );
(channel) {
{
System.out.println( + (body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE2, , defaultConsumer);
Thread.sleep();
}
}
3.4 Routing
核心特征:
工作流程:

代码实现:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, , );
channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, );
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, );
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, );
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, );
( ; i < ; i++) {
channel.basicPublish(Constants.DIRECT_EXCHANGE, , , ( + i).getBytes());
}
( ; i < ; i++) {
channel.basicPublish(Constants.DIRECT_EXCHANGE, , , ( + i).getBytes());
}
( ; i < ; i++) {
channel.basicPublish(Constants.DIRECT_EXCHANGE, , , ( + i).getBytes());
}
System.out.println();
channel.close();
connection.close();
}
}
com.rabbitmq.client.*;
rabbitmq.constant.Constants;
java.io.IOException;
java.util.concurrent.TimeoutException;
{
IOException, TimeoutException, InterruptedException {
();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
connectionFactory.newConnection();
connection.createChannel();
channel.queueDeclare(Constants.DIRECT_QUEUE1, , , , );
(channel) {
{
System.out.println( + (body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE1, , defaultConsumer);
Thread.sleep();
}
}
com.rabbitmq.client.*;
rabbitmq.constant.Constants;
java.io.IOException;
java.util.concurrent.TimeoutException;
{
IOException, TimeoutException, InterruptedException {
();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
connectionFactory.newConnection();
connection.createChannel();
channel.queueDeclare(Constants.DIRECT_QUEUE2, , , , );
(channel) {
{
System.out.println( + (body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE2, , defaultConsumer);
Thread.sleep();
}
}
3.5 Topics
核心特征:
- 根据路由键选择性接收消息
- 支持通配符匹配
- 通配符规则
*(星号):匹配一个单词
#(井号):匹配零个或多个单词
- 单词用点号分隔,如 stock.usd.nyse
工作流程:

代码实现:与 Routing 模式基本一致
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, , );
channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, );
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, );
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, );
( ; i < ; i++) {
channel.basicPublish(Constants.TOPIC_EXCHANGE, , , ( + i).getBytes());
}
( ; i < ; i++) {
channel.basicPublish(Constants.TOPIC_EXCHANGE, , , ( + i).getBytes());
}
( ; i < ; i++) {
channel.basicPublish(Constants.TOPIC_EXCHANGE, , , ( + i).getBytes());
}
System.out.println();
channel.close();
connection.close();
}
}
com.rabbitmq.client.*;
rabbitmq.constant.Constants;
java.io.IOException;
java.util.concurrent.TimeoutException;
{
IOException, TimeoutException, InterruptedException {
();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
connectionFactory.newConnection();
connection.createChannel();
channel.queueDeclare(Constants.TOPIC_QUEUE1, , , , );
(channel) {
{
System.out.println( + (body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE1, , defaultConsumer);
Thread.sleep();
}
}
com.rabbitmq.client.*;
rabbitmq.constant.Constants;
java.io.IOException;
java.util.concurrent.TimeoutException;
{
IOException, TimeoutException, InterruptedException {
();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
connectionFactory.newConnection();
connection.createChannel();
channel.queueDeclare(Constants.TOPIC_QUEUE2, , , , );
(channel) {
{
System.out.println( + (body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE2, , defaultConsumer);
Thread.sleep();
}
}
3.6 RPC
RPC(Remote Procedure Call,远程过程调用):通过网络从远程计算机上请求服务并同步等待服务端的响应。
核心特征:
- 请求/响应模式
- 实现远程过程调用,使用回调队列和关联 ID
工作流程:

- 当客户端启动时,它创建一个独占回调队列。
- 对于 RPC 请求,客户端发送一个具有两个属性的消息:reply_to,它被设置为回调队列,correlation_id,它被设置为每个请求的唯一值。
- 请求被发送到 rpc_queue 队列。
- RPC 工作者 (又名:服务器) 正在等待队列上的请求。当出现请求时,它完成任务,并使用 replyTo 字段中的队列将带有结果的消息发送回客户机。
- 客户端在应答队列上等待数据。当消息出现时,它会检查 correlationId 属性。如果与请求的值匹配,则将响应返回给应用程序。
代码实现:
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RpcClient {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
String correlationID UUID.randomUUID().toString();
AMQP. .BasicProperties.Builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();
channel.basicPublish(, Constants.RPC_REQUEST_QUEUE, properties, ().getBytes());
BlockingQueue<String> response = <>();
(channel) {
{
System.out.println( + (body));
(correlationID.equals(properties.getCorrelationId())) {
(!response.offer( (body))) {
System.out.println();
}
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, , defaultConsumer);
System.out.println( + response.take());
}
}
com.rabbitmq.client.*;
rabbitmq.constant.Constants;
java.io.IOException;
java.nio.charset.StandardCharsets;
java.util.concurrent.TimeoutException;
{
IOException, TimeoutException {
();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
connectionFactory.newConnection();
connection.createChannel();
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, , , , );
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, , , , );
channel.basicQos();
(channel) {
IOException {
(body, StandardCharsets.UTF_8);
System.out.println( + request);
System.out.println( + request + );
AMQP. .BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
channel.basicPublish(, Constants.RPC_RESPONSE_QUEUE, basicProperties, request.getBytes());
channel.basicAck(envelope.getDeliveryTag(), );
}
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, , defaultConsumer);
}
}