跳到主要内容RabbitMQ 简介与快速上手 | 极客日志Javajava
RabbitMQ 简介与快速上手
消息队列(MQ)的基本概念及其在分布式系统中的作用,包括异步解耦、流量削峰、消息分发和延迟通知。重点讲解了 RabbitMQ 作为基于 Erlang 实现的 AMQP 协议中间件的架构特点。内容涵盖 Linux 环境下(Ubuntu 和 CentOS)的安装部署步骤、RabbitMQ 的核心概念(如 Virtual Host、Exchange、Queue、Connection、Channel 等)、工作流程以及基于 Java 语言的快速上手示例,包括生产者和消费者的代码实现。
静心3 浏览 MQ(Message Queue):本质是一个先进先出的队列,只不过里面储存的是消息而已。消息可以是文本,JSON,也可以是内嵌对象等等。通常用于分布式系统之间的系统通信。
- 异步解耦:在业务流程中,一些操作可能非常耗时,但并不需要即时返回结果。可以借助 MQ 把这些操作异步化,比如用户注册后发送注册短信或邮件通知,可以作为异步任务处理,而不必等待这些操作完成后才告知用户注册成功。
- 流量削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用 MQ 能够使关键组件支撑突发访问压力,不会因为突发流量而崩溃。比如秒杀或者促销活动,可以使用 MQ 来控制流量,将请求排队,然后系统根据自己的处理能力逐步处理这些请求。
- 消息分发:当多个系统需要对同一数据做出响应时,可以使用 MQ 进行消息分发。比如支付成功后,支付系统可以向 MQ 发送消息,其他系统订阅该消息,而无需轮询数据库。
- 延迟通知:在需要在特定时间后发送通知的场景中,可以使用 MQ 的延迟消息功能,比如在电子商务平台中,如果用户下单后一定时间内未支付,可以使用延迟队列在超时后自动取消订单。
二、RabbitMQ 简介
RabbitMQ:采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 的消息中间件。
三、Linux 下安装 RabbitMQ
3.1 Ubuntu 环境安装
sudo apt-get update
sudo apt-get install erlang
查看 erlang 版本:erl
退出命令:halt().
sudo apt-get update
sudo apt-get install rabbitmq-server
systemctl status rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
- 添加管理员用户
rabbitmqctl add_user ${账号} ${密码}
- 给用户添加权限
rabbitmqctl set_user_tags ${账号} ${角色名称}
RabbitMQ 用户角色分为 Administrator、Monitoring、Policymaker、Management、Impersonator、None 共六种角色
- Administrator 超级管理员,可登陆管理控制台 (启用 management plugin 的情况下),可查看所有的信息,并且可以对用户,策略 (policy) 进行操作
- Monitoring 监控者,可登陆管理控制台 (启用 management plugin 的情况下),同时可以查看 rabbitmq 节点的相关信息 (进程数,内存使用情况,磁盘使用情况等)
- Policymaker 策略制定者,可登陆管理控制台 (启用 management plugin 的情况下),同时可以对 policy 进行管理。但无法查看节点的相关信息
- Management 普通管理者,仅可登陆管理控制台 (启用 management plugin 的情况下),无法看到节点信息,也无法对策略进行管理
- Impersonator 模拟者,无法登录管理控制台
- None 其他用户,无法登陆管理控制台,通常就是普通的生产者和消费者
- 重启 RabbitMQ:
sudo systemctl restart rabbitmq-server
- 服务操作:
通过 IP:port 访问界面,默认端口号 15672
- 卸载 RabbitMQ:
9.1. 停止 RabbitMQ 服务:
sudo systemctl stop rabbitmq-server
9.2. 查找 RabbitMQ 安装情况 dpkg -l | grep rabbitmq
9.3. 卸载 rabbitmq 已安装的相关内容 sudo apt-get purge --auto-remove rabbitmq-server
9.4. 卸载 Erlang
9.4.1. 查看 erlang 安装的相关列表:dpkg -l | grep erlang
9.4.2. 卸载 erlang 已安装的相关内容 sudo apt-get purge --auto-remove erlang
3.2 CentOS 安装
-
安装 Erlang
1.1. 查看系统版本:cat /etc/redhat-release
1.2. 下载 Erlang 的 rpm 包:wget --content-disposition "https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/download.rpm?distro_version_id=140"
1.3. 安装 Erlang : yum localinstall erlang-23.3.4.11-1.el7.x86_64.rpm
-
安装 RabbitMQ
2.1. 下载 RabbitMQ 客户端 wget --content-disposition "https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmqserver-3.8.30-1.el7.noarch.rpm/download.rpm?distro_version_id=140"
2.2. 安装 RabbitMQ 客户端:导入签名秘钥:rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc,用 yum 进行安装 yum localinstall rabbitmq-server-3.8.30-1.el7.noarch.rpm
-
安装 RabbitMQ 管理界面:rabbitmq-plugins enable rabbitmq_management
-
重启 RabbitMQ:sudo service rabbitmq-server restart
-
服务操作:
-
卸载 RabbitMQ:
6.1. 停止 RabbitMQ 服务:service rabbitmq-server stop
6.2. 查看 RabbitMQ 安装列表:yum list|grep rabbitmq
6.3 卸载 rabbitmq 已安装的相关内容 yum -y remove rabbitmq-server.noarch
6.4. 删除 RabbitMQ 相关文件 rm -rf /var/lib/rabbitmq/ rm -rf /usr/local/rabbitmq
6.5. 卸载 Erlang
6.5.1. 查看 erlang 安装的相关列表 yum list | grep erlang
6.5.2. 卸载 erlang 已安装的相关内容 yum -y remove erlang.x86_64
6.5.3. 删除 Erlang 相关文件 rm -rf /usr/lib64/erlang/ rm -rf /usr/local/erlang
四、工作流程
- 角色与组件
Broker(RabbitMQ Server)
├─ Virtual Host(vhost)——逻辑隔离单元,相当于数据库里的'库'
│ ├─ Exchange ——消息入口,负责路由
│ ├─ Queue ——消息缓冲,真正存储消息
│ ├─ Channel ——轻量级 TCP 子连接,生产/消费都在 channel 上发命令
│ ├─ Producer ——消息发布者
│ └─ Consumer ——消息接收者
└─ Connection ——TCP 长连接,一个 Connection 可开多条 Channel
- 一次完整的消息流转(单 vhost 内)
① Producer 建立 TCP Connection → 在 Connection 上创建 Channel
② 通过 Channel 把消息发给 Exchange,并指定 routing-key 等参数
③ Exchange 根据自身类型(direct / topic / fanout / headers)和 Binding 规则,把消息路由到 0~N 个 Queue
④ Queue 把消息按 FIFO 缓冲;若设置了持久化,则落地磁盘
⑤ Consumer 建立 Connection/Channel → 订阅(basic.consume)或单条获取(basic.get)Queue
⑥ Queue 把消息推/拉给 Consumer,Consumer 回复 ack(或自动 ack)
⑦ RabbitMQ 收到 ack 后删除该消息;若未收到 ack 且 Consumer 断开,消息重新入队(requeue)
多 vhost 与多角色并发
图中出现多个'Virtual Host'字样,说明可在同一 Broker 内跑多套隔离环境;不同 vhost 的 Exchange/Queue 完全隔离,互不可见。
- 多个 Producer1、Producer2、Consumer… 表明:
– 一个 Exchange 可对接多个 Producer;
– 一个 Queue 可对接多个 Consumer(平均分摊消息,实现负载均衡);
– 一个 Channel 只能串行收发,但应用可建多 Channel 并发。
- 一句话总结
'生产者把消息先给交换器,交换器按规则把消息转存到队列,队列再把消息分发给消费者'——这就是 RabbitMQ 最简也最核心的工作流程;而 Virtual Host、Connection、Channel 只是为了让这条流程在多租户、多线程、高并发场景下更安全、更高效。
五、核心概念
- Virtual host
Virtual host: 虚拟主机。这是一个虚拟概念。它为消息队列提供了一种逻辑上的隔离机制。对于 RabbitMQ 而言,一个 BrokerServer 上可以存在多个 Virtual Host。当多个不同的用户使用同一个 RabbitMQ Server 提供的服务时,可以虚拟划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Exchange
Exchange: 交换机。message 到达 broker 的第一站,它负责接收生产者发送的消息,并根据特定的规则把这些消息路由到一个或多个 Queue 列中。
Exchange 起到了消息路由的作用,它根据类型和规则来确定如何转发接收到的消息.
Queue
Queue: 队列,是 RabbitMQ 的内部对象,用于存储消息.
Connection 和 Channel
Connection: 连接。是客户端和 RabbitMQ 服务器之间的一个 TCP 连接。这个连接是建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息.
Channel: 通道,信道。Channel 是在 Connection 之上的一个抽象层。在 RabbitMQ 中,一个 TCP 连接可以有多个 Channel,每个 Channel 都是独立的虚拟连接。消息的发送和接收都是基于 Channel 的。
通道的主要作用是将消息的读写操作复用到同一个 TCP 连接上,这样可以减少建立和关闭连接的开销,提高性能.
Producer 和 Consumer
Producer: 生产者,是 RabbitMQ Server 的客户端,向 RabbitMQ 发送消息
Consumer: 消费者,也是 RabbitMQ Server 的客户端,从 RabbitMQ 接收消息
Broker:其实就是 RabbitMQ Server,主要是接收和收发消息
六、快速上手示例
6.1 引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.18.0</version>
</dependency>
6.2 生产者
6.2.1 建立连接
建立连接需要:IP,端口号,账号、密码、虚拟主机。
我们使用 com.rabbitmq.client 包下的 ConnectionFactory 连接工厂类设置需要的配置,并且创建连接。
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("${username}");
connectionFactory.setPassword("${password}");
connectionFactory.setVirtualHost("${virtualHost}");
Connection connection = connectionFactory.newConnection();
6.2.2 开启信道 创建 Channel
Channel channel = connection.createChannel();
6.2.3 声明一个交换机和一个队列 queue
交换机我们使用 RabbitMQ 自带的,
声明队列使用 Channel 类的 queueDeclare 方法。queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数说明:
- queue:队列名
- durable:是否持久化。true-设置队列为持久化,待久化的队列会存盘,服务器重启之后,消息不丢失。
- exclusive:是否独占,只能有一个消费者监听队列
- autoDelete:是否自动删除,当没有消费者时,自动删除掉
- arguments:一些参数。
channel.queueDeclare("hello", true, false, true, null);
6.2.4 发送消息
通过 Channel 类的 basicPublish 方法发送消息到队列中 basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
参数说明:
- exchange:交换机名称,简单模式下,交换机会使用默认的""
- routingKey:路由名称,等于队列名称
- props:配置信息
- body:发送的消息
String msg = "hello rabbitMQ";
channel.basicPublish("", "hello", null, msg.getBytes());
6.2.5 释放资源
channel.close();
connection.close();
6.2.6 运行
package org.example.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("${username}");
connectionFactory.setPassword("${password}");
connectionFactory.setVirtualHost("${virtualHost}");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", true, false, true, null);
String msg = "hello rabbitMQ";
channel.basicPublish("", "hello", null, msg.getBytes());
channel.close();
connection.close();
}
}
6.3 消费者
- 创建连接
- 创建 Channel
- 声明一个队列 Queue
- 消费消息
- 释放资源
消费消息:使用 Channel 类的 basicConsume 方法 basicConsume(String queue, boolean autoAck, Consumer callback)
参数说明:
- queue: 队列名称
- autoAck: 是否自动确认,消费者收到消息之后,自动和 MQ 确认
- callback: 回调对象
Consumer 类说明:
Consumer 用于定义消息消费者的行为。当我们需要从 RabbitMQ 接收消息时,需要提供一個实现了 Consumer 接口的对象。
DefaultConsumer 类 是 RabbitMQ 提供的一个默认消费者,实现了 Consumer 接口。
核心方法:
handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) : 从队列接收到消息时,会自动调用该方法。
在这个方法中,我们可以定义如何处理接收到的消息,例如打印消息内容,处理业务逻辑或者将消息存储到数据库等。
参数说明如下:
- consumerTag :消费者标签,通常是消费者在订阅队列时指定的.
- envelope :包含消息的封包信息,如队列名称,交换机等.
- properties :一些配置信息
- body :消息的具体内容
package org.example.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("${username}");
connectionFactory.setPassword("${password}");
connectionFactory.setVirtualHost("${virtualHost}");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", true, false, true, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume("hello", true, consumer);
channel.close();
connection.close();
}
}
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online