Spring Boot 集成 Eclipse Mosquitto
文章目录
添加 MQTT 客户端依赖
在 Spring Boot 项目的 pom.xml 中添加 Eclipse Paho MQTT 客户端依赖(主流的 MQTT Java 客户端):
<!-- MQTT 客户端 --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>配置 MQTT 连接参数
在 application.yml(或 application.properties)中配置 Mosquitto 连接信息:
mqtt:# 是否启用enable:true# Mosquitto 服务地址(非加密端口),若启用 TLS 加密,使用 ssl://localhost:8883broker: tcp://localhost:1883# 客户端唯一标识(建议加随机数避免冲突)client-id: springboot-mqtt-client # 认证用户名(Mosquitto 启用认证时必填)username: user1 # 认证密码password:123456# 默认 QoS 等级(0/1/2)defalut-qos:1# 心跳间隔(秒)keep-alive:60实现 MQTT 客户端(发布 + 订阅)
MQTT 客户端配置类
配置类
importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.*;importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;importorg.springframework.boot.autoconfigure.condition.ConditionalOnBean;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.util.StringUtils;@Slf4j@Configuration@ConditionalOnBean(MqttProperties.class)publicclassMqttConfig{privatefinalMqttProperties mqttProp;publicMqttConfig(MqttProperties mqttProp){this.mqttProp = mqttProp;}/** * 创建 MQTT 客户端实例 */@BeanpublicMqttClientmqttClient()throwsMqttException{// 客户端 ID 建议添加随机数,避免重复连接String clientIdWithRandom = mqttProp.getClientId()+"_"+System.currentTimeMillis();MqttClient client =newMqttClient(mqttProp.getBroker(), clientIdWithRandom,newMemoryPersistence());// 配置连接参数MqttConnectOptions options =newMqttConnectOptions();if(StringUtils.hasText((mqttProp.getUsername()))) options.setUserName(mqttProp.getUsername());if(StringUtils.hasText((mqttProp.getPassword()))) options.setPassword(mqttProp.getPassword().toCharArray()); options.setKeepAliveInterval(mqttProp.getKeepAlive());// 自动重连 options.setAutomaticReconnect(true);// 不清除会话(保留订阅关系和未确认消息) options.setCleanSession(false);// 连接回调(处理连接状态) client.setCallback(newMqttCallback(){/** * 连接断开时触发,可在此实现重连逻辑 */@OverridepublicvoidconnectionLost(Throwable cause){ log.error("MQTT 连接断开,原因:{}", cause.getMessage());}/** * 收到订阅的消息时触发,用于处理业务逻辑(如存储数据到数据库) */@OverridepublicvoidmessageArrived(String topic,MqttMessage message)throwsException{// 接收消息回调(订阅的主题有消息时触发)String content =newString(message.getPayload()); log.debug("收到消息 - 主题:{},内容:{}", topic, content);// TODO 业务逻辑}/** * 消息发布完成后触发,可用于确认消息已送达 */@OverridepublicvoiddeliveryComplete(IMqttDeliveryToken token){// 消息发布完成回调try{ log.debug("消息发布成功,主题:{}", token.getTopics()[0]);}catch(Exception e){ log.error("", e);}}});// 连接到 Mosquitto client.connect(options); log.info("MQTT 连接成功:{}", mqttProp.getClientId());return client;}}配置实体类
importlombok.Data;importorg.springframework.boot.autoconfigure.condition.ConditionalOnProperty;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.cloud.context.config.annotation.RefreshScope;importorg.springframework.stereotype.Component;@Data@Component@RefreshScope@ConfigurationProperties(prefix ="mqtt")@ConditionalOnProperty(name ="mqtt.enable", havingValue ="true")publicclassMqttProperties{/** * 是否启用 */privateboolean enable;/** * Mosquitto 服务地址(非加密端口)。若启用 TLS 加密,使用 ssl://localhost:8883 */privateString broker;/** * 客户端唯一标识(建议加随机数避免冲突) */privateString clientId;/** * 认证用户名(Mosquitto 启用认证时必填) */privateString username;/** * 认证密码 */privateString password;/** * 默认 QoS 等级(0/1/2),非关键数据用 QoS 0,重要状态用 QoS 1,核心控制指令用 QoS 2 */privateint defaultQos;/** * 心跳间隔(秒) */privateint keepAlive =60;}发布和订阅工具类
消息订阅工具类
importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.MqttClient;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.springframework.boot.autoconfigure.condition.ConditionalOnBean;importorg.springframework.stereotype.Component;/** * MQTT 消息订阅工具类 */@Slf4j@Component@ConditionalOnBean(MqttProperties.class)publicclassMqttSubscriber{privatefinalMqttClient mqttClient;privatefinalMqttProperties mqttProp;publicMqttSubscriber(MqttClient mqttClient,MqttProperties mqttProp){this.mqttClient = mqttClient;this.mqttProp = mqttProp;}/** * 订阅指定主题 * @param topic 主题(支持通配符,如 sensor/+) */publicvoidsubscribe(String topic)throwsMqttException{subscribe(topic, mqttProp.getDefaultQos());}/** * 订阅指定主题(自定义QoS) * @param topic 主题 * @param qos QoS等级 */publicvoidsubscribe(String topic,int qos)throwsMqttException{if(!mqttClient.isConnected()){ mqttClient.reconnect();} mqttClient.subscribe(topic, qos); log.info("已订阅主题:{},QoS等级:{}", topic, qos);}/** * 取消订阅主题 * @param topic 主题 */publicvoidunsubscribe(String topic)throwsMqttException{ mqttClient.unsubscribe(topic); log.info("已取消订阅主题:{}", topic);}}消息发布工具类
importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.MqttClient;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.eclipse.paho.client.mqttv3.MqttMessage;importorg.springframework.boot.autoconfigure.condition.ConditionalOnBean;importorg.springframework.stereotype.Component;/** * MQTT 消息发布工具类 */@Slf4j@Component@ConditionalOnBean(MqttProperties.class)publicclassMqttPublisher{privatefinalMqttClient mqttClient;privatefinalMqttProperties mqttProp;publicMqttPublisher(MqttClient mqttClient,MqttProperties mqttProp){this.mqttClient = mqttClient;this.mqttProp = mqttProp;}/** * 发布消息到指定主题 * @param topic 主题 * @param content 消息内容 */publicvoidpublish(String topic,String content)throwsMqttException{publish(topic, content, mqttProp.getDefaultQos());}/** * 发布消息到指定主题(自定义QoS) * @param topic 主题 * @param content 消息内容 * @param qos QoS等级 */publicvoidpublish(String topic,String content,int qos)throwsMqttException{if(!mqttClient.isConnected()){ mqttClient.reconnect();// 若断开连接,尝试重连} log.debug("发布消息,主题:{},内容:{}, QoS等级:{}", topic, content, qos);MqttMessage message =newMqttMessage(content.getBytes()); message.setQos(qos); mqttClient.publish(topic, message);}}测试 MQTT 功能
创建一个测试控制器,验证消息发布和订阅:
importcom.blackcrow.test.mqtt.config.MqttPublisher;importcom.blackcrow.test.mqtt.config.MqttSubscriber;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassMqttTestController{@AutowiredprivateMqttPublisher mqttPublisher;@AutowiredprivateMqttSubscriber mqttSubscriber;@Value("${mqtt.default-topic:topic/temp}")privateString defaultTopic;/** * 订阅主题 */@GetMapping("/subscribe")publicStringsubscribe(@RequestParam(required =false)String topic){try{String targetTopic = topic !=null? topic : defaultTopic; mqttSubscriber.subscribe(targetTopic);return"订阅成功:"+ targetTopic;}catch(MqttException e){return"订阅失败:"+ e.getMessage();}}/** * 发布消息 */@GetMapping("/publish")publicStringpublish(@RequestParam(required =false)String topic,@RequestParamString message){try{String targetTopic = topic !=null? topic : defaultTopic; mqttPublisher.publish(targetTopic, message);return"发布成功:主题="+ targetTopic +",消息="+ message;}catch(MqttException e){return"发布失败:"+ e.getMessage();}}}