【RabbitMQ】工作模式实现

【RabbitMQ】工作模式实现

目录

一、Work Queues (工作队列模式)

简单模式在这个系列第一个文章,上手程序就是一个Simple (简单模式)的实现。

⼯作队列模式⽀持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被⼀个消费者接收。

每个工作模式的实现,都先需要引入RabbitMQ的依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency>

1.1 生产者

生产者:

  1. 创建连接
  2. 创建Channel
  3. 声明⼀个队列Queue
  4. 生产消息
  5. 释放资源
packageorg.example.rabbitmq.workqueues;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducerDemo{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//开启信道Channel channel = connection.createChannel();//声明信道 channel.queueDeclare("workQueues",true,false,true,null);//声明队列,发送消息for(int i =0; i <10; i++){String msg ="hello workQueues"+i; channel.basicPublish("","workQueues",null,msg.getBytes());}System.out.println("消息发送成功");//资源释放 channel.close(); connection.close();}}

1.2 消费者

我们创建两个消费者模拟。
消费者:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源,为了造成两个消费者竞争,我们先不释放资源。
packageorg.example.rabbitmq.workqueues;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//建立连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明队列 channel.queueDeclare("workQueues",false,false,true,null);//消费消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Consumer1 接收到消息: "+newString(body));}}; channel.basicConsume("workQueues",true,consumer);Thread.sleep(100);////释放资源//channel.close();//connection.close();}}

效果:

二、Publish/Subscribe(发布/订阅)

在发布/订阅模型中,多了⼀个Exchange⻆⾊。

Exchange 常⻅有三种类型,分别代表不同的路由规则,也就分别对应不同的⼯作模式:

  1. Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)
  2. Direct:定向,把消息交给符合指定routing key的队列(Routing模式)
  3. Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topics模式)

2.1 生产者

生产者

  1. 创建连接
  2. 创建信道
  3. 声明交换机
  4. 声明两个队列
  5. 交换机与队列进行绑定
  6. 生产消息
  7. 释放资源

声明交换机的方法是Channel类下的exchangeDeclare方法

  • exchange – the name of the exchange,交换机名称
  • type – the exchange type ,交换机类型
  • durable – true if we are declaring a durable exchange (the exchange will survive a server restart),是否可持久化
  • autoDelete – true if the server should delete the exchange when it is no longer in use ,是否自动删除
  • internal – true if the exchange is internal, i.e. can’t be directly published to by a client,是否内部使用,内部使用客户端发不进去消息
  • arguments – other properties (construction arguments) for the exchange,参数
packageorg.example.rabbitmq.fanout;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明交换机 channel.exchangeDeclare("fanout.exchange",BuiltinExchangeType.FANOUT,false);//4. 声明队列 channel.queueDeclare("fanout.queue1",true,false,false,null); channel.queueDeclare("fanout.queue2",true,false,false,null);//5.交换机与队列绑定 channel.queueBind("fanout.queue1","fanout.exchange",""); channel.queueBind("fanout.queue2","fanout.exchange","");//6. 生产消息for(int i =0; i <10; i++){String msg ="hello fanout"+i; channel.basicPublish("fanout.exchange","",null,msg.getBytes());}System.out.println("发送消息成功");//7. 释放资源 channel.close(); connection.close();}}

2.2 消费者

两个消费者分别消费两个队列的消息。

消费者:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源。
packageorg.example.rabbitmq.fanout;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明队列 channel.queueDeclare("fanout.queue1",true,false,false,null);//消费消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Consumer1 接收到消息: "+newString(body));}}; channel.basicConsume("fanout.queue1",consumer);/* //释放资源 channel.close(); connection.close();*/}}

结果:

三、Routing(路由模式)

Routing(路由模式):
队列和交换机的绑定,不能是任意的绑定了,⽽是要指定⼀个BindingKey (RoutingKey的⼀种),消息的发送⽅在向Exchange发送消息时,也需要指定消息的RoutingKey,Exchange也不再把消息交给每⼀个绑定的key,⽽是根据消息的RoutingKey进⾏判断,只有队列绑定时的BindingKey和发送消息的RoutingKey 完全⼀致,才会接收到消息。

3.1 生产者

生产者:

  1. 创建连接
  2. 创建信道
  3. 声明交换机,类型为direct
  4. 声明队列
  5. 队列与交换机绑定,绑定的时候加上BindingKey参数
  6. 生产消息,消息发送的时候指定routingKey
  7. 释放资源
packageorg.example.rabbitmq.direct;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明交换机 channel.exchangeDeclare("direct.exchange",BuiltinExchangeType.DIRECT,true);//4. 声明队列 channel.queueDeclare("direct.queue1",true,false,false,null); channel.queueDeclare("direct.queue2",true,false,false,null);//5. 队列与交换机绑定 channel.queueBind("direct.queue1","direct.exchange","a"); channel.queueBind("direct.queue2","direct.exchange","a"); channel.queueBind("direct.queue2","direct.exchange","b"); channel.queueBind("direct.queue2","direct.exchange","c");//6. 生产消息String msgA ="hello direct routingKey is a"; channel.basicPublish("direct.exchange","a",null,msgA.getBytes());String msgB ="hello direct routingKey is b"; channel.basicPublish("direct.exchange","b",null,msgB.getBytes());String msgC ="hello direct routingKey is c"; channel.basicPublish("direct.exchange","c",null,msgC.getBytes());System.out.println("发送消息成功");//7. 释放资源 channel.close(); connection.close();}}

3.2 消费者

两个消费者分别消费两个队列的消息。

消费者:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源。
packageorg.example.rabbitmq.direct;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明队列 channel.queueDeclare("direct.queue1",true,false,false,null);//消费消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Consumer1 接收到消息: "+newString(body));}}; channel.basicConsume("direct.queue1",consumer);/* //释放资源 channel.close(); connection.close();*/}}

结果:

四、Topics(通配符模式)

Topics 和 Routing 模式的区别是:

  1. topics 模式使⽤的交换机类型为topic(Routing模式使⽤的交换机类型为direct)
  2. topic 类型的交换机在匹配规则上进⾏了扩展,Binding Key⽀持通配符匹配(direct类型的交换机路由规则是BindingKey和RoutingKey完全匹配)。

在topic类型的交换机在匹配规则上,有些要求:

  1. RoutingKey 是⼀系列由点( . )分隔的单词,⽐如 " stock.usd.nyse “,” nyse.vmw “,” quick.orange.rabbit "
  2. BindingKey 和 RoutingKey⼀样,也是点( . )分割的字符串。
  3. Binding Key中可以存在两种特殊字符串,⽤于模糊匹配
    3.1 * 表⽰⼀个单词
    3.2 # 表⽰多个单词(0-N个)

4.1 生产者

生产者:

  1. 创建连接
  2. 创建信道
  3. 声明交换机,类型为topic
  4. 声明队列
  5. 队列与交换机绑定,绑定的时候加上BindingKey参数
  6. 生产消息,消息发送的时候指定routingKey
  7. 释放资源
packageorg.example.rabbitmq.topics;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明交换机 channel.exchangeDeclare("topic.exchange",BuiltinExchangeType.TOPIC,true);//4. 声明队列 channel.queueDeclare("topic.queue1",true,false,false,null); channel.queueDeclare("topic.queue2",true,false,false,null);//5. 队列与交换机绑定 channel.queueBind("topic.queue1","topic.exchange","*.a.*"); channel.queueBind("topic.queue2","topic.exchange","*.*.b"); channel.queueBind("topic.queue2","topic.exchange","c.#");//6. 生产消息String msgA ="hello topic routingKey is word.a.word"; channel.basicPublish("topic.exchange","word.a.word",null,msgA.getBytes());String msgB ="hello topic routingKey is word.word.b"; channel.basicPublish("topic.exchange","word.word.b",null,msgB.getBytes());String msgC ="hello topic routingKey is c.word.word.word.word.b"; channel.basicPublish("topic.exchange","c.word.word.word.word.b",null,msgC.getBytes());String msgD ="hello topic routingKey is c.a.b"; channel.basicPublish("topic.exchange","c.a.b",null,msgD.getBytes());System.out.println("发送消息成功");//7. 释放资源 channel.close(); connection.close();}}

4.2 消费者

两个消费者分别消费两个队列的消息。

消费者:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源。
packageorg.example.rabbitmq.topics;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明队列 channel.queueDeclare("topic.queue1",true,false,false,null);//消费消息DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Consumer1 接收到消息: "+newString(body));}}; channel.basicConsume("topic.queue1",consumer);//释放资源// channel.close();// connection.close();}}

结果:

五、RPC通信

RPC(Remote Procedure Call),即远程过程调⽤。它是⼀种通过⽹络从远程计算机上请求服务,⽽不需要了解底层⽹络的技术。类似于Http远程调⽤。
RabbitMQ实现RPC通信的过程,⼤概是通过两个队列实现⼀个可回调的过程。

⼤概流程如下:

  1. 客⼾端发送消息到⼀个指定的队列,并在消息属性中设置 replyTo 字段,这个字段指定了⼀个回调队列,服务端处理后,会把响应结果发送到这个队列。
  2. 服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列
  3. 客⼾端在回调队列上等待响应消息,⼀旦收到响应,客⼾端会检查消息的 correlationId 属性,以确保它是所期望的响应。

5.1 客户端

客户端:

  1. 声明两个队列,包含回调队列 replyQueueName,声明本次请求的唯⼀标志 corrId
  2. 将 replyQueueName 和 corrId 配置到要发送的消息队列中
  3. 使⽤阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中
  4. 阻塞队列有消息后,主线程被唤醒,打印返回内容
packageorg.example.rabbitmq.rpc;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.UUID;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.TimeoutException;publicclassClient{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//3. 声明队列 channel.queueDeclare("rpc.request.queue",true,false,false,null); channel.queueDeclare("rpc.response.queue",true,false,false,null);//4. 发送消息//设置请求标识String correlationId = UUID.randomUUID().toString();AMQP.BasicProperties properties =newAMQP.BasicProperties().builder().correlationId(correlationId).replyTo("rpc.response.queue").build();String msg ="hello rpc"; channel.basicPublish("","rpc.request.queue", properties,msg.getBytes());//5. 接收响应//使用阻塞队列存储响应finalBlockingQueue<String> response =newArrayBlockingQueue<>(1);DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Client 接收到消息: "+newString(body));//判断唯⼀标识正确, 放到阻塞队列中if(correlationId.equals(properties.getCorrelationId())){ response.offer(newString(body));}System.out.println("Client 接收到消息: "+newString(body));}}; channel.basicConsume("rpc.response.queue",true,consumer);// 获取响应的结果String result = response.take();System.out.println(" [RPC_Client] Result:"+ result);}}

5.2 服务器

服务器:

  1. 接收消息
  2. 根据消息内容进⾏响应处理,把应答结果返回到回调队列中
packageorg.example.rabbitmq.rpc;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassServer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1. 创建连接ConnectionFactory connectionFactory =newConnectionFactory(); connectionFactory.setHost("101.43.47.137");//ip地址 connectionFactory.setPort(5672);//默认端口号 connectionFactory.setUsername("study");//用户 connectionFactory.setPassword("study");//用户密码 connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2. 创建信道Channel channel = connection.createChannel();//接收请求//每次接受一条 channel.basicQos(1);DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Server 接收到请求: "+newString(body));//响应请求String response ="响应";AMQP.BasicProperties props =newAMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).replyTo("rpc.request.queue").build(); channel.basicPublish("","rpc.response.queue",props,response.getBytes());//手动确认 channel.basicAck(envelope.getDeliveryTag(),false);}}; channel.basicConsume("rpc.request.queue",false,consumer);}}

结果:

六、Publisher Confirms(发布确认)

消息中间件都会有消息丢失的问题发生,大概分为以下三种丢失情况:

  1. 生产者问题:因为应用故障,网络等问题,生产者没有成功向消息中间件发送消息;
  2. 消息中间件问题:生产者成功发送了消息,消息中间件自己原因导致消息丢失;
  3. 消费者问题:消费者消费消息时,处理出现问题,导致消费者 消费失败的 消息 从消息队列中删除了。

RabbitMQ针对上面三种情况给出的解决方案:

  1. 针对生产者问题:采取Publisher Confirms(发布确认)机制 解决;
  2. 针对消息中间件问题:通过持久化机制解决;
  3. 针对消费者问题:通过消息应答机制解决;

生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始);
一旦消息被投递到所有匹配的队列之后,RabbitMO就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出.
brokerl(消息中间件)回传给生产者的确认消息中 deliveryTag 包含了确认消息的序号,
此外 broker 也可以设置 channel.basicAck() 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理.

6.1 Publishing Messages Individually(单独确认)

跟生产者发送消息,只有调用Channel类confirmSelect()设置信道为confirm模式,和Channel类waitForConfirmsOrDie()方法等待手动确认。

privatestaticvoidpublishingMessagesIndividually()throwsIOException,TimeoutException,InterruptedException{try(Channel channel =createChannel()){//设置信道为confirm模式 channel.confirmSelect();//声明队列 channel.queueDeclare("publish.confirm.queue1",true,false,false,null);long start =System.currentTimeMillis();//发送消息for(int i =0; i <200; i++){String msg ="Publishing Messages Individually "+ i; channel.basicPublish("","publish.confirm.queue1",null,msg.getBytes());//等待确认 channel.waitForConfirmsOrDie(5000);}long end =System.currentTimeMillis();System.out.println("Publishing Messages Individually(单独确认) 发送200条消息耗时 "+(end - start));}}

结果:

可以发现,发送200条消息,耗时很⻓。

观察上⾯代码,会发现这种策略是每发送⼀条消息后就调⽤ channel.waitForConfirmsOrDie() ⽅法,之后 等待服务端的确认,这实际上是⼀种串⾏同步等待的⽅式。
尤其对于持久化的消息来说,需要等待消息确认存储在磁盘之后才会返回(调⽤Linux内核的fsync⽅法)。
但是发布确认机制是⽀持异步的。可以⼀边发送消息,⼀边等待消息确认。

6.2 Publishing Messages in Batches(批量确认)

每发送⼀批消息后,调⽤ channel.waitForConfirms ⽅法,等待服务器的确认返回。

跟单独确认区别就是,发送到一定消息再进行等待确认。

privatestaticvoidpublishingMessagesInBatches()throwsIOException,TimeoutException,InterruptedException{try(Channel channel =createChannel()){//设置信道为confirm模式 channel.confirmSelect();//声明队列 channel.queueDeclare("publish.confirm.queue2",true,false,false,null);long start =System.currentTimeMillis();//发送消息int batchSize =100;int flag =0;for(int i =0; i <200; i++){String msg ="Publishing Messages in Batches "+ i; channel.basicPublish("","publish.confirm.queue2",null,msg.getBytes());//批量 等待确认if(flag == batchSize){ channel.waitForConfirmsOrDie(5000); flag =0;} flag++;}if(flag >0){ channel.waitForConfirmsOrDie(5000);}long end =System.currentTimeMillis();System.out.println("Publishing Messages in Batches(批量确认) 发送200条消息耗时 "+(end - start));}}

结果:

相⽐于单独确认策略,批量确认极⼤地提升了confirm的效率,
缺点是出现Basic.Nack或者超时时,我们不清楚具体哪条消息出了问题。客⼾端需要将这⼀批次的消息全部重发,这会带来明显的重复消息数量。
当消息经常丢失时,批量确认的性能应该是不升反降的。

6.3 Handling Publisher Confirms Asynchronously(异步确认)

提供⼀个回调⽅法,服务端确认了⼀条或者多条消息后客⼾端会回这个⽅法进⾏处理。

异步confirm⽅法的编程实现最为复杂。Channel 接⼝提供了⼀个⽅法 addConfirmListener,这个⽅法可以添加ConfirmListener 回调接⼝。

ConfirmListener 接⼝中包含两个⽅法:handleAck(long deliveryTag, boolean multiple) handleNack(long deliveryTag, boolean multiple) ,分别对应处理RabbitMQ发送给⽣产者的 ack 和 nack。

deliveryTag 表⽰发送消息的序号,multiple 表⽰是否批量确认。我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合。
当收到RabbitMQ的confirm 回调时,从集合中删除对应的消息。当Channel开启confirm模式后,channel上发送消息都会附带⼀个从1开始递增的deliveryTag序号。

我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合。

  1. 当收到ack时,从序列中删除该消息的序号。如果为批量确认消息,表⽰⼩于等于当前序号deliveryTag的消息都收到了,则清除对应集合
  2. 当收到nack时,处理逻辑类似,不过需要结合具体的业务情况,进⾏消息重发等操作。
privatestaticvoidhandlingPublisherConfirmsAsynchronously()throwsIOException,TimeoutException,InterruptedException{try(Channel channel =createChannel()){//设置信道为confirm模式 channel.confirmSelect();//声明队列 channel.queueDeclare("publish.confirm.queue3",true,false,false,null);//有序集合,元素按照⾃然顺序进⾏排序,存储未confirm消息序号SortedSet<Long> confirmSet =Collections.synchronizedSortedSet(newTreeSet<>());long start =System.currentTimeMillis();//监听 channel.addConfirmListener(newConfirmListener(){@OverridepublicvoidhandleAck(long deliveryTag,boolean multiple)throwsIOException{//批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰这批序号的消息都已经被ack了if(multiple){ confirmSet.headSet(deliveryTag+1).clear();}else{//单独确认 confirmSet.remove(deliveryTag);}}@OverridepublicvoidhandleNack(long deliveryTag,boolean multiple)throwsIOException{//批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰这批序号的消息都已经被ack了if(multiple){ confirmSet.headSet(deliveryTag+1).clear();}else{//单独确认 confirmSet.remove(deliveryTag);}//根据业务处理}});//发送消息int flag =0;for(int i =0; i <200; i++){String msg ="Handling Publisher Confirms Asynchronously "+ i;//得到下次发送消息的序号, 从1开始long nextPublishSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish("","publish.confirm.queue3",null,msg.getBytes());//存入集合 confirmSet.add(nextPublishSeqNo);}while(confirmSet.isEmpty()){Thread.sleep(20);}long end =System.currentTimeMillis();System.out.println("Handling Publisher Confirms Asynchronously(异步确认) 发送200条消息耗时 "+(end - start));}}

结果:

Read more

基于MATLAB的障碍感知无人机导航实现,建筑障碍物、采用26连通域的三维A星算法路径规划、路径平滑优化以及动态无人机轨迹可视化功能

✅作者简介:热爱科研的Matlab仿真开发者,擅长毕业设计辅导、数学建模、数据处理、建模仿真、程序设计、完整代码获取、论文复现及科研仿真。 🍎 往期回顾关注个人主页:Matlab科研工作室  👇 关注我领取海量matlab电子书和数学建模资料  🍊个人信条:格物致知,完整Matlab代码获取及仿真咨询内容私信。 🔥 内容介绍 一、背景 在复杂环境中,如城市建筑区域,无人机需要具备可靠的导航能力以避开障碍物,安全高效地到达目标地点。实现障碍感知的无人机导航涉及路径规划、路径优化以及可视化展示等关键技术,这些技术对于提升无人机在复杂场景下的自主飞行能力至关重要。三维 A 星算法结合 26 连通域能够有效处理三维空间中的路径搜索问题,而路径平滑优化可使规划出的路径更适合无人机飞行,动态轨迹可视化则有助于操作人员实时掌握无人机的飞行状态。 二、原理 (一)基于 26 连通域的三维 A 星算法路径规划 1. A 星算法基础:A 星算法是一种启发式搜索算法,用于在图或网格中寻找从起点到目标点的最短路径。其核心思想是通过一个估值函数 f(n)=g(

By Ne0inhk
免费部署openClaw龙虾机器人(经典)

免费部署openClaw龙虾机器人(经典)

前几天出了个免费玩龙虾的详细教程,很多小伙伴觉得不错,但是还有一些新手留言反馈内容不够详细,这次我将重新梳理一遍,做一期更细致的攻略,同时扩展补充配置好之后的推荐(我认为是必要)操作,争取一篇文章让大家可以收藏起来,随时全套参照复用。 先看效果测试 部署完成基础运行效果测试,你可以直接问clawdbot当前的模型: 1.Token平台准备 首先,还是准备好我们可以免费撸的API平台 这里我找到了两个可以免费使用的API,测试之后执行效率还可以,下面将分别进行细致流程拆解。 1.1 硅基流动获取ApiKey (相对免费方案 推荐) 硅基流动地址:https://cloud.siliconflow.cn/i/6T57VxS2 如果有账号的直接登录,没有的注册一个账号,这个认证就送16元,可以直接玩收费模型,真香。认证完成后在API秘钥地方新建秘钥。 硅基流动里面很多模型原来是免费的,有了16元注册礼,很多收费的模型也相当于免费用了,我体验一下了原来配置免费模型还能用,也是值得推荐的。建议使用截图的第一个模型体验一下,我一直用它。 1.2 推理时代

By Ne0inhk
宇树机器人SDK2开发指南:从环境搭建到Demo测试

宇树机器人SDK2开发指南:从环境搭建到Demo测试

本文以宇树 G1 人形机器人为主线,系统介绍 unitree_sdk2(C++)与 unitree_sdk2_python(Python)的完整开发流程,涵盖通信架构原理、环境搭建、依赖安装、Demo 编译运行、网络配置以及常见问题处理,适合具身智能领域的初中级开发者快速上手。 目录 1. SDK2 概述与架构原理 2. 开发环境要求 3. 获取官方 SDK 包 4. 安装依赖与编译 5. 机器人与开发机网络配置 6. 调试并运行 Demo 7. Python SDK Demo 测试 8. 常见问题与解决方案 9. 总结 1. SDK2 概述与架构原理 1.

By Ne0inhk
从零开始使用ISSACLAB训练自己的机器人行走

从零开始使用ISSACLAB训练自己的机器人行走

ISAACLAB入门教程 作者:陈维耀 1. 环境配置 1.1 推荐配置 * 操作系统: Ubuntu 22.04 LTS * 显卡: NVIDIA RTX 4080或以上 1.2 ubuntu 22.04 LTS安装 参考ZEEKLOG的Ubuntu 16.04 LTS安装教程,将其中的ubuntu 16.04镜像文件替换为ubuntu 22.04镜像文件,其他步骤保持不变,建议/home与/usr的硬盘容量均不少于200G。 1.3 安装NVIDIA驱动 根据自身显卡型号与操作系统,选择对应的显卡驱动,建议选择550.xxx.xxx版本的显卡驱动,按照教程进行安装即可,安装完成后在终端输入nvidia-smi,若出现以下信息则表示驱动安装成功: Thu Jun 5

By Ne0inhk