RabbitMQ 发布确认模式
概述
发布确认模式用于确保消息已经被正确地发送到 RabbitMQ 服务器,并被成功接收和持久化。通过使用发布确认,生产者可以获得对消息的可靠性保证,避免消息丢失。这一机制基于通道(Channel)级别,通过两个阶段的确认来保证消息的可靠性。

消息丢失问题
作为消息中间件,都会面临消息丢失的问题。消息丢失大概分为三种情况:
- 生产者问题。因为应用程序故障、网络抖动等各种原因,生产者没有成功向 Broker 发送消息。
- 消息中间件自身问题。生产者成功发送给了 Broker,但是 Broker 没有把消息保存好,导致消息丢失。
- 消费者问题。Broker 发送消息到消费者,消费者在消费消息时,因为没有处理好,导致 Broker 将消费失败的消息从队列中删除了。

RabbitMQ 也对上述问题给出了相应的解决方案。问题 2 可通过持久化机制解决。问题 3 可采用消息应答机制。针对问题 1,可采用发布确认(Publisher Confirms)机制实现。
发布确认的三种模式
RabbitMQ 的发布确认模式主要有三种形式:单条确认、批量确认和异步确认。
单条确认(Single Publisher Confirm)
特点:在发布一条消息后,等待服务器确认该消息是否成功接收。
优点:实现简单,每条消息的确认状态清晰。
缺点:性能开销较大,特别是在高并发的场景下,因为每条消息都需要等待服务器的确认。
批量确认(Batch Publisher Confirm)
特点:允许在一次性确认多个消息是否成功被服务器接收。
优点:在大量消息的场景中可以提高效率,因为可以减少确认消息的数量。
缺点:当一批消息中有一条消息发送失败时,整个批量确认失败。此时需要重新发送整批消息,但不知道是哪条消息发送失败,增加了调试和处理的难度。
异步确认(Asynchronous Confirm)
特点:通过回调函数处理消息的确认和未确认事件,更加灵活。
优点:在异步场景中能够更好地处理消息的状态,提高了系统的并发性能和响应速度。
缺点:实现相对复杂,需要处理回调函数的逻辑和状态管理。
实现步骤
- 设置通道为发布确认模式:在生产者发送消息之前,需要将通道设置为发布确认模式。这可以通过调用 channel.confirmSelect() 方法来实现。
- 发送消息并等待确认:生产者发送消息时,每条消息都会分配一个唯一的、递增的整数 ID(DeliveryTag)。生产者可以通过调用 channel.waitForConfirms() 方法来等待所有已发送消息的确认,或者通过其他方式处理确认回调。
- 处理确认回调:为了处理确认回调,需要创建一个 ConfirmListener 接口的实现。在实现的 handleAck() 方法中,可以处理成功接收到确认的消息的逻辑;在 handleNack() 方法中,可以处理未成功接收到确认的消息的逻辑。
应用场景
发布确认模式适用于对数据安全性要求较高的场景,如金融交易、订单处理等。在这些场景中,消息的丢失或重复都可能导致严重的业务问题。通过使用发布确认模式,可以确保消息被正确地发送到 RabbitMQ 服务器,并被成功接收和持久化,从而提高了系统的可靠性和稳定性。
代码案例
引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.21.0</version>
</dependency>
常量类
public class Constants {
public static final String HOST = "47.98.109.138";
public static final int PORT = 5672;
public static final String USER_NAME = "study";
public static final String PASSWORD = "study";
public static final String VIRTUAL_HOST = "aaa";
public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1";
public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2";
public static final ;
}
单条确认
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class PublisherConfirms {
private static final Integer MESSAGE_COUNT = 100;
static Connection createConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
return connectionFactory.newConnection();
}
public static void main(String[] args) throws Exception {
publishingMessagesIndividually();
}
private static void publishingMessagesIndividually() throws Exception {
try (Connection createConnection()) {
connection.createChannel();
channel.confirmSelect();
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, , , , );
System.currentTimeMillis();
( ; i < MESSAGE_COUNT; i++) {
+ i;
channel.basicPublish(, Constants.PUBLISHER_CONFIRMS_QUEUE1, , msg.getBytes());
channel.waitForConfirmsOrDie();
}
System.currentTimeMillis();
System.out.printf(, MESSAGE_COUNT, end - start);
}
}
}
运行结果

测试结果显示,以发送消息条数为 100 条为例,单条确认模式耗时较长。
批量确认
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class PublisherConfirms {
private static final Integer MESSAGE_COUNT = 10000;
static Connection createConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
return connectionFactory.newConnection();
}
public static void main(String[] args) throws Exception {
publishingMessagesInBatches();
}
private static void publishingMessagesInBatches() throws Exception {
( createConnection()) {
connection.createChannel();
channel.confirmSelect();
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, , , , );
System.currentTimeMillis();
;
;
( ; i < MESSAGE_COUNT; i++) {
+ i;
channel.basicPublish(, Constants.PUBLISHER_CONFIRMS_QUEUE2, , msg.getBytes());
outstandingMessageCount++;
(outstandingMessageCount == batchSize) {
channel.waitForConfirmsOrDie();
outstandingMessageCount = ;
}
}
(outstandingMessageCount > ) {
channel.waitForConfirmsOrDie();
}
System.currentTimeMillis();
System.out.printf(, MESSAGE_COUNT, end - start);
}
}
}
运行结果

测试结果显示,以发送消息条数为 10000 条为例,批量确认模式比单条确认模式快很多。
异步确认
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class PublisherConfirms {
private static final Integer MESSAGE_COUNT = 10000;
static Connection createConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
return connectionFactory.newConnection();
}
public static void main(String[] args) throws Exception {
handlingPublisherConfirmsAsynchronously();
}
private static void handlingPublisherConfirmsAsynchronously() throws Exception {
try ( createConnection()) {
connection.createChannel();
channel.confirmSelect();
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, , , , );
SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet( <>());
System.currentTimeMillis();
channel.addConfirmListener( () {
IOException {
(multiple) {
confirmSeqNo.headSet(deliveryTag + ).clear();
} {
confirmSeqNo.remove(deliveryTag);
}
}
IOException {
(multiple) {
confirmSeqNo.headSet(deliveryTag + ).clear();
} {
confirmSeqNo.remove(deliveryTag);
}
}
});
( ; i < MESSAGE_COUNT; i++) {
+ i;
channel.getNextPublishSeqNo();
channel.basicPublish(, Constants.PUBLISHER_CONFIRMS_QUEUE3, , msg.getBytes());
confirmSeqNo.add(seqNo);
}
(!confirmSeqNo.isEmpty()) {
Thread.sleep();
}
System.currentTimeMillis();
System.out.printf(, MESSAGE_COUNT, end - start);
}
}
}
运行结果

测试结果显示,以发送消息条数为 10000 条为例,异步确认模式非常快。
对比批量确认和异步确认模式

测试结果表明,异步确认模式比批量确认模式快很多。