
消息确认机制
RabbitMQ 的消息确认机制
生产者发送消息之后,到达消费端之后,可能会有以下情况:
a. 消息处理成功
b. 消息处理异常

RabbitMQ 向消费者发送消息之后,就会把这条消息删掉,那么第二种情况,就会造成消息丢失。那么如何确保消费端已经成功接收了,并正确处理了呢?为了保证消息从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。
消费者在订阅队列时,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种:
• 自动确认:当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存 (或者磁盘) 中删除,而不管消费者是否真正地消费到了这些消息。自动确认模式适合对于消息可靠性要求不高的场景。
• 手动确认:当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式地调用 Basic.Ack 命令,回复确认信号后才从内存 (或者磁盘) 中移去消息。这种模式适合对消息可靠性要求比较高的场景。
自动确认
源代码:
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
代码示例:
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:
一是等待投递给消费者的消息。
二是已经投递给消费者,但是还没有收到消费者确认信号的消息。
如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

从 RabbitMQ 的 Web 管理平台上,也可以看到当前队列中 Ready 状态和 Unacked 状态的消息数。
Ready: 等待投递给消费者的消息数
Unacked: 已经投递给消费者,但是未收到消费者确认信号的消息数
手动确认
消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ 也提供了不同的确认应答的方式,消费者客户端可以调用与其对应的 channel 的相关方法,共有以下三种:
- 肯定确认:Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMQ 已知道该消息并且成功的处理消息。可以将其丢弃了。
参数说明:
- deliveryTag: 消息的唯一标识,它是一个单调递增的 64 位的长整型值。deliveryTag 是每个通道(Channel)独立维护的,所以在每个通道上都是唯一的。当消费者确认 (ack) 一条消息时,必须使用对应的通道上进行确认。
- multiple: 是否批量确认。在某些情况下,为了减少网络流量,可以对一系列连续的 deliveryTag 进行批量确认。值为 true 则会一次性 ack 所有小于或等于指定 deliveryTag 的消息。值为 false,则只确认当前指定 deliveryTag 的消息。

deliveryTag 是 RabbitMQ 中消息确认机制的一个重要组成部分,它确保了消息传递的可靠性和顺序性。
- 否定确认:Channel.basicReject(long deliveryTag, boolean requeue)
RabbitMQ 在 2.0.0 版本开始引入了 Basic.Reject 这个命令,消费者客户端可以调用 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。
参数说明:
- deliveryTag: 参考 channel.basicAck
- requeue: 表示拒绝后,这条消息如何处理。如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者。如果 requeue 参数设置为 false,则 RabbitMQ 会把消息从队列中移除,而不会把它发送给新的消费者。
- 否定确认:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Basic.Reject 命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用 Basic.Nack 这个命令。消费者客户端可以调用 channel.basicNack 方法来实现。
multiple 参数设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息。
Spring-AMQP 的消息确认机制
下面我们基于 SpringBoot 来演示消息确认机制,不过和 RabbitMQ Java Client 库有一定差异。
Spring-AMQP 对消息确认机制提供了三种策略。

消息确认机制的三种策略的解读:
1. AcknowledgeMode.NONE
这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ 就会自动确认消息,从 RabbitMQ 队列中移除消息。如果消费者处理消息失败,消息可能会丢失。
2. AcknowledgeMode.AUTO(默认)
这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息。
3. AcknowledgeMode.MANUAL
手动确认模式下,消费者必须在成功处理消息后显式调用 basicAck 方法来确认消息。如果消息未被确认,RabbitMQ 会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息,这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理。
代码演示
常量类
public class Constants {
public static final String ACK_QUEUE = "ack.queue";
public static final String ACK_EXCHANGE = "ack.exchange";
}
声明队列和交换机并绑定二者关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.example.demo.constant.Constants;
@Configuration
public class RabbitMQConfig {
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(Constants.ACK_QUEUE).build();
}
@Bean("directExchange")
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
}
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("ackQueue") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("ack");
}
}
声明 RabbitTemplate
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
编写生产消息代码
import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.demo.constant.Constants;
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "consumer ack mode test...");
return "消息发送成功";
}
}
AcknowledgeMode.NONE(演示)
添加配置
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://localhost:5672/
listener:
simple:
acknowledge-mode: none
编写消费消息代码 1
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handMessage(Message message, Channel channel) throws UnsupportedEncodingException {
System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
System.out.println("业务逻辑处理");
System.out.println("业务处理完成");
}
}
发送消息

消费消息

编写消费消息代码 2
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handMessage(Message message, Channel channel)throws UnsupportedEncodingException {
System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
System.out.println("业务逻辑处理");
int num = 3/0;
System.out.println("业务处理完成");
}
}
发送消息

消费消息



此时我们可以看到,消息虽然被消费者收到了,但是因为消费时发生异常导致消息没有被正常消费,而且队列中的消息也已经没有了。
AcknowledgeMode.AUTO(演示)
添加配置
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://localhost:5672/
listener:
simple:
acknowledge-mode: auto
编写消费消息代码 1
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handMessage(Message message, Channel channel) throws UnsupportedEncodingException {
System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
System.out.println("业务逻辑处理");
System.out.println("业务处理完成");
}
}
生产消息

消费消息

编写消费消息代码 2
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handMessage(Message message, Channel channel)throws UnsupportedEncodingException {
System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
System.out.println("业务逻辑处理");
int num = 3/0;
System.out.println("业务处理完成");
}
}
生产消息

消费消息




此时我们可以看到,消费消息时发生异常,导致消息没有被正常消费,而且此时控制台不断地打印日志,deliveryTag 的值不断增加,观察控制界面可以看到队列中始终有一条消息,是因为 AUTO 机制下,如果消费消息时发生异常,此时消费方不会确认消息,此时消息就会重新入队,直到消费方能够正常消费消息而不发生异常。
AcknowledgeMode.MANUAL(演示)
添加配置
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://localhost:5672/
listener:
simple:
acknowledge-mode: manual
编写消费消息代码 1
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
System.out.println("业务逻辑处理");
System.out.println("业务处理完成");
}
}
生产消息

消费消息

编写消费消息代码 2
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
System.out.println("业务逻辑处理");
int num = 3/0;
System.out.println("业务处理完成");
}
}
发送消息

消费消息



此时我们看到,MANUAL 消息确认机制下,消费方消费消息时发生异常,没有手动确认,会导致消息在队列中的状态变为 Unacked 状态。
编写消费消息代码 3
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
System.out.println("业务逻辑处理");
System.out.println("业务处理完成");
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
}
}
}
生产消息

消费消息

此时我们可以看到,在 MANUAL 消息确认机制下,如果消息手动确认了,就没问题了。
编写消费消息代码 4
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
System.out.println("业务逻辑处理");
int num = 3/0;
System.out.println("业务处理完成");
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
}
}
}
生产消息

消费消息


此时我们可以看到,当消费消息时,因为发生异常,程序会运行到拒绝消息,但是因为设置了重新入队,会导致控制台不停地打印消息,deliveryTag 的值不断增加。