详解RabbitMQ工作模式之RPC通信模式

详解RabbitMQ工作模式之RPC通信模式

目录

RPC通信模式

概述

工作流程

特点

应用场景

代码案例

引入依赖

常量类

编写客户端代码

编写服务端代码

运行程序(先运行客户端,再运行服务端)


RPC通信模式

概述
在RabbitMQ中,RPC模式通过消息队列实现远程调用功能。客户端(生产者)发送消息到消费队列,服务端(消费者)进行消息消费并执行相应的程序,然后将结果发送到回调队列供客户端使用。这是一种双向的生产消费模式,其中客户端既是生产者又是消费者,服务端则专注于处理消息并生成响应。

在RPC通信的过程中, 没有⽣产者和消费者, ⽐较像咱们RPC远程调⽤, ⼤概就是通过两个队列实现了⼀个可回调的过程.

工作流程

1.客户端发送请求:

客户端连接到RabbitMQ服务器。
客户端声明一个用于发送RPC请求的队列(通常是固定的,如rpc_queue)。
客户端创建一个临时的回调队列,并在发送请求时,将回调队列的名称作为消息属性(reply_to)发送给交换机。
客户端为每个请求生成一个唯一的correlation_id,并将其作为消息属性发送,以便在接收响应时能够匹配请求与响应。

2.交换机路由请求:

交换机接收到RPC请求后,根据路由键将请求路由到服务端监听的队列。

3.服务端处理请求:

服务端(消费者)从队列中接收请求。
服务端处理请求,并生成响应。
服务端将响应发送到客户端指定的回调队列,并在消息属性中设置相同的correlation_id。

4.客户端接收响应:

客户端监听其回调队列以接收响应。
当接收到响应时,客户端检查correlation_id以确定响应是否与之前的请求匹配。
如果匹配,客户端处理响应;如果不匹配,客户端可能丢弃该响应。
特点
1.解耦:客户端和服务端之间不需要直接通信,降低了系统间的耦合度。
2.灵活性:支持多种语言和平台之间的远程调用。
3.可扩展性:通过增加服务端(消费者)的数量,可以轻松扩展RPC服务。
4.性能开销:由于涉及到网络传输和消息队列的处理,RPC调用的性能通常低于本地调用。
5.复杂性:需要处理消息队列的可靠性、持久性、消息确认等复杂问题。
6.安全性:远程调用可能面临更多的安全风险,如消息篡改、中间人攻击等。
应用场景
RabbitMQ的RPC通信模式适用于需要远程调用服务的场景,如分布式系统中的服务调用、微服务架构中的服务通信等。通过RabbitMQ的消息队列机制,可以实现跨系统、跨语言的远程调用,提高系统的灵活性和可扩展性。
代码案例
引入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.21.0</version> </dependency>
常量类
public class Constants { public static final String HOST = "47.98.109.138"; public static final int PORT = 5672; public static final String USER_NAME = "study"; public static final String PASSWORD = "study"; public static final String VIRTUAL_HOST = "aaa"; //rpc 模式 public static final String RPC_REQUEST_QUEUE = "rpc.request.queue"; public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue"; }
编写客户端代码
import com.rabbitmq.client.*; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; /** * rpc 客户端 * 1. 发送请求 * 2. 接收响应 */ public class RpcClient { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null); channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null); //3. 发送请求 String msg = "hello rpc..."; //设置请求的唯一标识 String correlationID = UUID.randomUUID().toString(); //设置请求的相关属性 AMQP.BasicProperties props = new AMQP.BasicProperties().builder() .correlationId(correlationID) .replyTo(Constants.RPC_RESPONSE_QUEUE) .build(); channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes()); //4. 接收响应 //使用阻塞队列, 来存储响应信息 final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String respMsg = new String(body); System.out.println("接收到回调消息: "+ respMsg); if (correlationID.equals(properties.getCorrelationId())){ //如果correlationID校验一致 response.offer(respMsg); } } }; channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer); String result = response.take(); System.out.println("[RPC Client 响应结果]:"+ result); } }
编写服务端代码
import com.rabbitmq.client.*; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * RPC server * 1. 接收请求 * 2. 发送响应 */ public class RpcServer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 接收请求 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String request = new String(body,"UTF-8"); System.out.println("接收到请求:"+ request); String response = "针对request:"+ request +", 响应成功"; AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .correlationId(properties.getCorrelationId()) .build(); channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes()); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer); } }
运行程序(先运行客户端,再运行服务端)

可以在管理界面看到其中一个队列中有1条消息

我们可以看到,服务端接收到了消息并给客户端发送了响应,与预期符合。

Read more

【算法详解】理解KMP,真的那么难吗?—— 一篇讲透它的核心思想

【算法详解】理解KMP,真的那么难吗?—— 一篇讲透它的核心思想

🫧 励志不掉头发的内向程序员:个人主页  ✨️ 个人专栏: 《C++语言》《Linux学习》 🌅偶尔悲伤,偶尔被幸福所完善 👓️博主简介: 文章目录 * 前言 * 一、相关概念 * 二、前缀函数 * 三、计算前缀函数 * 四、用前缀函数解决字符串匹配 * 五、kmp 算法模板 * 六、next 数组版本 * 七、周期和循环节 * 总结 前言 本文用尽量详细的语言来讲解说明 kmp 算法内容,学习之前需要知道一点点动态规划的基础,如果不知道最好去了解了解。我们一起来看看算法吧。 一、相关概念 在学习 kmp 算法之前,我们得先提前了解最基本的 “ 动态规划 ” 的知识,否则可能学习的时候会有一些困难,因为它的原理类似于动态规划。 字符串: * 用字符构成的的序列就是字符串。 这个概念很简单,但是我们这里有个小技巧:就和动态规划那样,

By Ne0inhk
【数据结构与算法】21.合并两个有序链表(LeetCode)

【数据结构与算法】21.合并两个有序链表(LeetCode)

文章目录 * 合并两个有序链表:高效算法解析与实现 * 问题描述 * 核心思路:双指针尾插法 * 完整代码实现 * 关键点解析 * 1. 边界条件处理 * 2. 头节点初始化 * 3. 节点比较与插入 * 4. 剩余节点处理 * 常见错误与修正 * 优化方案:哨兵节点 * 算法应用场景 * 总结 * 总结 合并两个有序链表:高效算法解析与实现 链表合并是数据结构中的经典问题,在算法面试和实际开发中经常出现。本文将深入解析如何高效合并两个有序链表,并展示C语言的实现方案。 问题描述 给定两个升序排列的链表list1和list2,要求将它们合并为一个新的升序链表并返回。新链表应该通过拼接给定链表的节点来完成。 示例: 输入:list1 = [1,2,4], list2 = [1,3,4] 输出:[1,1,2,3,4,4] 核心思路:

By Ne0inhk

【学习记录】使用 John the Ripper 和 Hashcat破解 RAR、ZIP 与 7z 文件密码(Windows教程)

文章目录 * 📌 引言 * 📦 支持类型 * 🛠️ 所需工具下载地址(Windows 官方编译版) * 额外工具 * 🔐 一、破解RAR 压缩文件密码 * 步骤 1:获取RAR 文件的加密哈希值 * 步骤 2:将正确的哈希值复制到 `hash.txt` 文件 * 步骤 3:在 Hashcat 官方 wiki 查找 `-m` 哈希模式 ID * 步骤 4: 使用 Hashcat 破解哈希值 * 📁 二、破解ZIP 压缩文件密码 * 步骤 1:提取 ZIP 文件的加密哈希值 * 步骤 2:将正确的哈希值复制到 `hash.txt` 文件 * 步骤

By Ne0inhk
Flutter 三方库 collection — 鸿蒙应用全方位集合操作与算法增强利器,实现鸿蒙深度适配下的高效容器过滤与优先级队列实战全解析(适配鸿蒙 HarmonyOS Next ohos)

Flutter 三方库 collection — 鸿蒙应用全方位集合操作与算法增强利器,实现鸿蒙深度适配下的高效容器过滤与优先级队列实战全解析(适配鸿蒙 HarmonyOS Next ohos)

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net。 Flutter 三方库 collection — 鸿蒙应用全方位集合操作与算法增强利器,实现鸿蒙深度适配下的高效容器过滤与优先级队列实战全解析 前言 在鸿蒙(OpenHarmony)应用开发中,数据结构的选择往往决定了逻辑的成败。当标准的 List、Set、Map 无法满足更高级的需求(例如:需要一个自动按优先级排序的任务队列,或者需要判断两个深度嵌套的 Map 是否完全一致)时,开发者就需要引入更强大的集合支持。 collection 是 Dart 官方维护的最核心基础库之一。它不仅补充了大量缺失的容器类型(如 PriorityQueue、Heap),还为原生集合提供了极其丰富的扩展工具类(如 ListEquality、CanonicalizedMap)。在 Flutter for OpenHarmony 的底层架构实践中,它是处理复杂业务逻辑、优化检索效率的必备“基石”。 一、原理解析 / 概念介绍

By Ne0inhk