详解RabbitMQ高级特性之死信队列

详解RabbitMQ高级特性之死信队列

目录

死信队列

添加配置

常量类

声明队列和交换机并绑定二者关系

死信--消息过期

给队列设置TTL

编写生产消息代码

编写消费消息代码

观察现象

死信--消息超过队列最大长度

设置队列的最大长度

编写生产消息代码

编写消费消息代码

观察现象

死信--消息被拒绝

编写生产消息代码

编写消费消息代码

观察现象

面试题


死信队列

死信(dead message) 简单理解就是因为种种原因, ⽆法被消费的信息, 就是死信.
有死信, ⾃然就有死信队列. 当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器
中,这个交换器就是DLX( Dead Letter Exchange ), 绑定DLX的队列, 就称为死信队列(Dead
Letter Queue,简称DLQ).

消息变成死信⼀般是由于以下⼏种情况:

1. 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false.
2. 消息过期.
3. 队列达到最⼤⻓度.
添加配置
spring: application: name: rabbit-extensions-demo rabbitmq: addresses: amqp://study:[email protected]:5672/extension
常量类
public class Constants { //死信 public static final String NORMAL_QUEUE = "normal.queue"; public static final String NORMAL_EXCHANGE = "normal.exchange"; public static final String DL_QUEUE = "dl.queue"; public static final String DL_EXCHANGE= "dl.exchange"; }
声明队列和交换机并绑定二者关系
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import rabbitextensionsdemo.constant.Constants; @Configuration public class DLConfig { //正常的交换机和队列 @Bean("normalQueue") public Queue normalQueue(){ return QueueBuilder.durable(Constants.NORMAL_QUEUE) .deadLetterExchange(Constants.DL_EXCHANGE) .deadLetterRoutingKey("dlx") .build(); } @Bean("normalExchange") public DirectExchange normalExchange(){ return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build(); } @Bean("normalBinding") public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("normal").noargs(); } //死信交换机和队列 @Bean("dlQueue") public Queue dlQueue(){ return QueueBuilder.durable(Constants.DL_QUEUE).build(); } @Bean("dlExchange") public DirectExchange dlExchange(){ return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build(); } @Bean("dlBinding") public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs(); } }
死信--消息过期
给队列设置TTL
编写生产消息代码
 @RequestMapping("/dl") public String dl() { System.out.println("dl..."); //发送普通消息 rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."); System.out.printf("%tc 消息发送成功 \n", new Date()); return "消息发送成功"; }
编写消费消息代码
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import rabbitextensionsdemo.constant.Constants; import java.util.Date; @Component public class DLListener { @RabbitListener(queues = Constants.DL_QUEUE) public void dlHandMessage(Message message, Channel channel) throws Exception { //消费者逻辑 System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag()); } }
观察现象

我们可以看到,消息在10秒后过期,从normal队列进入到了死信队列,消息进入到死信队列后被消费。

死信--消息超过队列最大长度
设置队列的最大长度
编写生产消息代码
 @RequestMapping("/dl") public String dl() { //测试队列长度 for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."+i); } return "消息发送成功"; }
编写消费消息代码
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import rabbitextensionsdemo.constant.Constants; import java.util.Date; @Component public class DLListener { @RabbitListener(queues = Constants.DL_QUEUE) public void dlHandMessage(Message message, Channel channel) throws Exception { //消费者逻辑 System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag()); } }
观察现象

此时我们可以看到,给队列设置了最大长度为10,但是队列接收到了20条消息,就会导致前10条消息变成死信。

死信--消息被拒绝
编写生产消息代码
 @RequestMapping("/dl") public String dl() { System.out.println("dl..."); //发送普通消息 rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."); System.out.printf("%tc 消息发送成功 \n", new Date()); return "消息发送成功"; }
编写消费消息代码
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import rabbitextensionsdemo.constant.Constants; @Component public class DLListener { @RabbitListener(queues = Constants.NORMAL_QUEUE) public void handMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //消费者逻辑 System.out.printf("[normal.queue]接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag()); //进行业务逻辑处理 System.out.println("业务逻辑处理"); int num = 3/0; System.out.println("业务处理完成"); //肯定确认 channel.basicAck(deliveryTag,false); } catch (Exception e) { //否定确认 channel.basicNack(deliveryTag, false, false); //requeue为false, 该消息成为死信 } } }
观察现象

可以看到,normal队列中的消息在被消费时因为发生了异常而执行到了拒绝消息的代码,而且设置了消息不重新入队,导致消息变成了死信,进而进入到了死信队列。

面试题

1.死信队列的概念

死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些⽆法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列⽤于存储这些死信消息。

2.死信的来源

1) 消息过期: 消息在队列中存活的时间超过了设定的TTL
2) 消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新⼊队(requeue=false), 消息也会成为死信.
3) 队列满了: 当队列达到最⼤⻓度, ⽆法再容纳新的消息时, 新来的消息会被处理为死信.

3.死信的应用场景 

对于RabbitMQ来说, 死信队列是⼀个⾮常有⽤的特性. 它可以处理异常情况下,消息不能够被消费者正确消费⽽被置⼊死信队列中的情况, 应⽤程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况, 进⽽可以改善和优化系统.

⽐如: ⽤⼾⽀付订单之后, ⽀付系统会给订单系统返回当前订单的⽀付状态
为了保证⽀付信息不丢失, 需要使⽤到死信队列机制. 当消息消费异常时, 将消息投⼊到死信队列中, 由订单系统的其他消费者来监听这个队列, 并对数据进⾏处理(⽐如发送⼯单等,进⾏⼈⼯确认).

场景的应⽤场景还有:

• 消息重试:将死信消息重新发送到原队列或另⼀个队列进⾏重试处理.
• 消息丢弃:直接丢弃这些⽆法处理的消息,以避免它们占⽤系统资源.
• ⽇志收集:将死信消息作为⽇志收集起来,⽤于后续分析和问题定位.

Read more

Flutter 三方库 arcane_helper_utils 的鸿蒙化适配指南 - 实现具备通用逻辑增强与多维开发脚手架的实用工具集、支持端侧业务开发的效率倍增实战

Flutter 三方库 arcane_helper_utils 的鸿蒙化适配指南 - 实现具备通用逻辑增强与多维开发脚手架的实用工具集、支持端侧业务开发的效率倍增实战

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 三方库 arcane_helper_utils 的鸿蒙化适配指南 - 实现具备通用逻辑增强与多维开发脚手架的实用工具集、支持端侧业务开发的效率倍增实战 前言 在进行 Flutter for OpenHarmony 开发时,如何快速处理常见的字符串格式化、色值转换、日期计算或布尔值增强?虽然每一个功能都很小,但如果每个项目都重复造轮子,开发效率将大打折扣。arcane_helper_utils 是一款专注于极致实用的“瑞士军刀”型工具集。本文将探讨如何在鸿蒙端通过这类高内聚的 Utility 集实现极致、丝滑的业务交付。 一、原直观解析 / 概念介绍 1.1 基础原理 该库通过对 Dart 原生类型(Object, String, List, Map, Bool)

Windows 安装 Neo4j(2025最新·极简)

Windows 安装 Neo4j(2025最新·极简)

目录 1. 准备 2. 下载安装包 3. 一键安装 4. 启动 Neo4j 5.安装 Neo4j 的系统服务 Neo4j 是目前最流行的原生图数据库,用图结构(节点-关系-属性)存储数据,而非传统表结构。它专为海量关联数据设计,提供: * 原生图存储:基于免索引邻接结构,每个节点直接维护指向相邻节点的物理指针,实现 O(1) 时间复杂度的图遍历。 * Cypher 查询语言:ISO 标准化图查询语言,采用 ASCII-Art 模式匹配语法,支持可变长度路径、子图查询、聚合与更新混合事务。 * ACID 事务:支持完整事务、集群高可用,可承载企业级负载。 * 丰富生态:内置 Graph Data Science (GDS)

【火】Spatial Joy 2025 全球 AR&AI 赛事:开发者要的资源、玩法、避坑攻略都在这

【火】Spatial Joy 2025 全球 AR&AI 赛事:开发者要的资源、玩法、避坑攻略都在这

Spatial Joy 2025 Rokid乐奇 全球 AR&AI 开发大赛 值不值得参加?不少参加过连续两届 Rokid乐奇 赛事的老兵,纷纷表示非常值得参加。 先说最实在的——奖金。 AR赛道分为应用和游戏两个赛道,金奖各20万人民币,而且是现金!交完税全是你自己的!这还不够,AR赛道总共设了27个奖项,据我打听到的往年数据,能正常跑进初赛的作品大概就60-70个,这意味着获奖比例相当高。 20万就封顶了吗?远远没有!亚马孙科技给使用Kiro并获奖的开发者,在原奖金基础上再加20%现金奖励! AI赛道同样设置了27个奖项,奖金从1万到5万不等,主要以智能体开发为主,支持市面上所有智能体平台的适配。也就是说,你之前做的智能体微调一下就能参赛! 更重要的是,现在正是智能眼镜行业爆发前夜。据我观察,未来2-3年将是空间计算应用落地的关键窗口期,提前布局的开发者将占据绝对先发优势。 好了,重磅消息说完,下面是我为大家整理的详细参赛指南: 先给开发者交个底:这赛事值得花时间吗? 对技术人来说,一场赛事值不值得冲,就看三点:资源给不给力、

使用 Angular 构建 Java 桌面应用

使用 Angular 构建 Java 桌面应用

本文介绍如何构建一个跨平台的 Java 桌面应用,在原生 Swing 窗口中集成现代化的 Angular Web 界面。 前置条件 要完成本教程,您需要: * Git * Java 17 或更高版本 * Node.js 22.0+ * npm 9+ * 有效的 JxBrowser 许可证(评估版或商业版)。有关许可证的更多信息,请参阅许可指南。 项目设置 本教程示例应用程序的代码与其他示例一起,存储在一个基于 Gradle 的 GitHub 仓库中。 如果您想构建一个基于 Maven 的项目,请参考 Maven 配置指南。如果您希望从头开始构建一个基于 Gradle 的项目,请参考 Gradle 配置指南。 获取代码 要获取代码,请执行以下命令:

阿里云全品类 8 折券限时领,建站 / AI / 存储通用 立即领取