Spring Boot 集成 Eclipse Mosquitto

Spring Boot 集成 Eclipse Mosquitto

文章目录

添加 MQTT 客户端依赖

在 Spring Boot 项目的 pom.xml 中添加 Eclipse Paho MQTT 客户端依赖(主流的 MQTT Java 客户端):

<!-- MQTT 客户端 --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>

配置 MQTT 连接参数

application.yml(或 application.properties)中配置 Mosquitto 连接信息:

mqtt:# 是否启用enable:true# Mosquitto 服务地址(非加密端口),若启用 TLS 加密,使用 ssl://localhost:8883broker: tcp://localhost:1883# 客户端唯一标识(建议加随机数避免冲突)client-id: springboot-mqtt-client # 认证用户名(Mosquitto 启用认证时必填)username: user1 # 认证密码password:123456# 默认 QoS 等级(0/1/2)defalut-qos:1# 心跳间隔(秒)keep-alive:60

实现 MQTT 客户端(发布 + 订阅)

MQTT 客户端配置类

配置类

importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.*;importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;importorg.springframework.boot.autoconfigure.condition.ConditionalOnBean;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.util.StringUtils;@Slf4j@Configuration@ConditionalOnBean(MqttProperties.class)publicclassMqttConfig{privatefinalMqttProperties mqttProp;publicMqttConfig(MqttProperties mqttProp){this.mqttProp = mqttProp;}/** * 创建 MQTT 客户端实例 */@BeanpublicMqttClientmqttClient()throwsMqttException{// 客户端 ID 建议添加随机数,避免重复连接String clientIdWithRandom = mqttProp.getClientId()+"_"+System.currentTimeMillis();MqttClient client =newMqttClient(mqttProp.getBroker(), clientIdWithRandom,newMemoryPersistence());// 配置连接参数MqttConnectOptions options =newMqttConnectOptions();if(StringUtils.hasText((mqttProp.getUsername()))) options.setUserName(mqttProp.getUsername());if(StringUtils.hasText((mqttProp.getPassword()))) options.setPassword(mqttProp.getPassword().toCharArray()); options.setKeepAliveInterval(mqttProp.getKeepAlive());// 自动重连 options.setAutomaticReconnect(true);// 不清除会话(保留订阅关系和未确认消息) options.setCleanSession(false);// 连接回调(处理连接状态) client.setCallback(newMqttCallback(){/** * 连接断开时触发,可在此实现重连逻辑 */@OverridepublicvoidconnectionLost(Throwable cause){ log.error("MQTT 连接断开,原因:{}", cause.getMessage());}/** * 收到订阅的消息时触发,用于处理业务逻辑(如存储数据到数据库) */@OverridepublicvoidmessageArrived(String topic,MqttMessage message)throwsException{// 接收消息回调(订阅的主题有消息时触发)String content =newString(message.getPayload()); log.debug("收到消息 - 主题:{},内容:{}", topic, content);// TODO 业务逻辑}/** * 消息发布完成后触发,可用于确认消息已送达 */@OverridepublicvoiddeliveryComplete(IMqttDeliveryToken token){// 消息发布完成回调try{ log.debug("消息发布成功,主题:{}", token.getTopics()[0]);}catch(Exception e){ log.error("", e);}}});// 连接到 Mosquitto client.connect(options); log.info("MQTT 连接成功:{}", mqttProp.getClientId());return client;}}

配置实体类

importlombok.Data;importorg.springframework.boot.autoconfigure.condition.ConditionalOnProperty;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.cloud.context.config.annotation.RefreshScope;importorg.springframework.stereotype.Component;@Data@Component@RefreshScope@ConfigurationProperties(prefix ="mqtt")@ConditionalOnProperty(name ="mqtt.enable", havingValue ="true")publicclassMqttProperties{/** * 是否启用 */privateboolean enable;/** * Mosquitto 服务地址(非加密端口)。若启用 TLS 加密,使用 ssl://localhost:8883 */privateString broker;/** * 客户端唯一标识(建议加随机数避免冲突) */privateString clientId;/** * 认证用户名(Mosquitto 启用认证时必填) */privateString username;/** * 认证密码 */privateString password;/** * 默认 QoS 等级(0/1/2),非关键数据用 QoS 0,重要状态用 QoS 1,核心控制指令用 QoS 2 */privateint defaultQos;/** * 心跳间隔(秒) */privateint keepAlive =60;}

发布和订阅工具类

消息订阅工具类

importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.MqttClient;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.springframework.boot.autoconfigure.condition.ConditionalOnBean;importorg.springframework.stereotype.Component;/** * MQTT 消息订阅工具类 */@Slf4j@Component@ConditionalOnBean(MqttProperties.class)publicclassMqttSubscriber{privatefinalMqttClient mqttClient;privatefinalMqttProperties mqttProp;publicMqttSubscriber(MqttClient mqttClient,MqttProperties mqttProp){this.mqttClient = mqttClient;this.mqttProp = mqttProp;}/** * 订阅指定主题 * @param topic 主题(支持通配符,如 sensor/+) */publicvoidsubscribe(String topic)throwsMqttException{subscribe(topic, mqttProp.getDefaultQos());}/** * 订阅指定主题(自定义QoS) * @param topic 主题 * @param qos QoS等级 */publicvoidsubscribe(String topic,int qos)throwsMqttException{if(!mqttClient.isConnected()){ mqttClient.reconnect();} mqttClient.subscribe(topic, qos); log.info("已订阅主题:{},QoS等级:{}", topic, qos);}/** * 取消订阅主题 * @param topic 主题 */publicvoidunsubscribe(String topic)throwsMqttException{ mqttClient.unsubscribe(topic); log.info("已取消订阅主题:{}", topic);}}

消息发布工具类

importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.MqttClient;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.eclipse.paho.client.mqttv3.MqttMessage;importorg.springframework.boot.autoconfigure.condition.ConditionalOnBean;importorg.springframework.stereotype.Component;/** * MQTT 消息发布工具类 */@Slf4j@Component@ConditionalOnBean(MqttProperties.class)publicclassMqttPublisher{privatefinalMqttClient mqttClient;privatefinalMqttProperties mqttProp;publicMqttPublisher(MqttClient mqttClient,MqttProperties mqttProp){this.mqttClient = mqttClient;this.mqttProp = mqttProp;}/** * 发布消息到指定主题 * @param topic 主题 * @param content 消息内容 */publicvoidpublish(String topic,String content)throwsMqttException{publish(topic, content, mqttProp.getDefaultQos());}/** * 发布消息到指定主题(自定义QoS) * @param topic 主题 * @param content 消息内容 * @param qos QoS等级 */publicvoidpublish(String topic,String content,int qos)throwsMqttException{if(!mqttClient.isConnected()){ mqttClient.reconnect();// 若断开连接,尝试重连} log.debug("发布消息,主题:{},内容:{}, QoS等级:{}", topic, content, qos);MqttMessage message =newMqttMessage(content.getBytes()); message.setQos(qos); mqttClient.publish(topic, message);}}

测试 MQTT 功能

创建一个测试控制器,验证消息发布和订阅:

importcom.blackcrow.test.mqtt.config.MqttPublisher;importcom.blackcrow.test.mqtt.config.MqttSubscriber;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassMqttTestController{@AutowiredprivateMqttPublisher mqttPublisher;@AutowiredprivateMqttSubscriber mqttSubscriber;@Value("${mqtt.default-topic:topic/temp}")privateString defaultTopic;/** * 订阅主题 */@GetMapping("/subscribe")publicStringsubscribe(@RequestParam(required =false)String topic){try{String targetTopic = topic !=null? topic : defaultTopic; mqttSubscriber.subscribe(targetTopic);return"订阅成功:"+ targetTopic;}catch(MqttException e){return"订阅失败:"+ e.getMessage();}}/** * 发布消息 */@GetMapping("/publish")publicStringpublish(@RequestParam(required =false)String topic,@RequestParamString message){try{String targetTopic = topic !=null? topic : defaultTopic; mqttPublisher.publish(targetTopic, message);return"发布成功:主题="+ targetTopic +",消息="+ message;}catch(MqttException e){return"发布失败:"+ e.getMessage();}}}

Read more

《C#上位机开发从门外到门内》3-5:基于FastAPI的Web上位机系统

《C#上位机开发从门外到门内》3-5:基于FastAPI的Web上位机系统

文章目录 * 一、项目概述 * 二、系统架构设计 * 三、前后端开发 * 四、数据可视化 * 五、远程控制 * 六、系统安全性与稳定性 * 七、性能优化与测试 * 八、实际应用案例 * 九、结论 随着互联网技术的快速发展,Web上位机系统在工业自动化、智能家居、环境监测等领域的应用日益广泛。基于FastAPI或Flask的Web上位机系统,凭借其高效、灵活和易于扩展的特点,成为当前研究和应用的热点。本文将详细探讨基于FastAPI和Flask的Web上位机系统的设计与实现,涵盖系统架构、前后端开发、数据可视化、远程控制、安全性、性能优化以及实际应用案例等方面,旨在为相关领域的研究人员和工程技术人员提供参考和借鉴。 一、项目概述 Web上位机系统是一种通过网络实现对远程设备或环境进行实时监控和控制的系统。其核心目标是通过高效的数据传输和处理,确保监控的实时性和准确性,从而实现对远程设备的有效管理和控制。基于FastAPI或Flask的Web上位机系统利用Python的Web框架,通过互联网或局域网实现数据的传输和通信,具有广泛的应用前景。 Fa

By Ne0inhk
Spring Boot + jQuery 前后端分离图书管理系统:从接口设计到问题排查

Spring Boot + jQuery 前后端分离图书管理系统:从接口设计到问题排查

图书管理系统 1.1 准备前端代码 在本地想要的可以去我的gitee中下载 library 的相关前端代码 1.2 约定前后端交互接口 需求分析 图书管理系统是⼀个相对较大一点的案例,咱们先实现其中的⼀部分功能. 用户登录 1. 登录接口 2. 图书列表展示 字段说明: 字段说明id图书 IDbookName图书名称author作者count数量price定价publish图书出版社status图书状态 1 - 可借阅 其他 - 不可借阅statusCN图书状态中文含义 3.4.3 服务器代码 创建图书类 BookInfo @Data public class BookInfo { //图书ID private Integer id; //书名 private String bookName; //作者 private String

By Ne0inhk

RuoYi-Vue3跨平台开发实践:从Web到桌面的无缝迁移方案

RuoYi-Vue3跨平台开发实践:从Web到桌面的无缝迁移方案 【免费下载链接】RuoYi-Vue3:tada: (RuoYi)官方仓库 基于SpringBoot,Spring Security,JWT,Vue3 & Vite、Element Plus 的前后端分离权限管理系统 项目地址: https://gitcode.com/GitHub_Trending/ruo/RuoYi-Vue3 你是否曾为Web应用在特定场景下的局限性而困扰?是否希望将成熟的企业级管理系统快速转化为功能完备的桌面应用?本文将为你揭秘如何通过Electron框架,将RuoYi-Vue3权限管理系统改造为跨平台桌面应用,实现技术栈的平滑迁移和部署效率的显著提升。 痛点分析:为什么需要桌面化改造 Web应用的现实局限 在传统的Web部署模式下,RuoYi-Vue3虽然功能强大,但在以下场景中仍显不足: 场景问题描述桌面化优势离线使用依赖网络连接支持本地运行系统集成访问权限受限深度系统集成用户体验浏览器限制原生应用体验安全性数据暴露风险本地数据存储 跨平台开发的价值体现 通过Electr

By Ne0inhk
Web-Check+cpolar:全方位检查网站还能随时随地访问,太方便了!

Web-Check+cpolar:全方位检查网站还能随时随地访问,太方便了!

文章目录 * 前言 * 1.关于Web-Check * 2.功能特点 * 3.安装Docker * 4.创建并启动Web-Check容器 * 5.本地访问测试 * 6.公网远程访问本地Web-Check * 7.内网穿透工具安装 * 8.创建远程连接公网地址 * 9.使用固定公网地址远程访问 前言 Web-Check 能分析网站的 IP 信息、SSL 证书、DNS 记录、性能和安全配置等,适合网站开发者、运维和安全人员使用,优点是信息全面,能一键获取网站多维度数据。 使用时发现它对新手很友好,操作简单,不过检测结果需要一定专业知识解读,建议结合实际需求重点关注关键指标,如开放端口和 SSL 配置。 但它默认只能在局域网内使用,要是想和异地团队共享检测结果,或者在外网随时查看网站状态,就很不方便,得依赖复杂的网络配置。 而搭配 cpolar 后,能生成公网访问地址,

By Ne0inhk