Flutter for OpenHarmony:mqtt_client 连接 MQTT 代理,实现物联网(IoT)设备实时状态监控(轻量级发布订阅协议) 深度解析与鸿蒙适配指南

Flutter for OpenHarmony:mqtt_client 连接 MQTT 代理,实现物联网(IoT)设备实时状态监控(轻量级发布订阅协议) 深度解析与鸿蒙适配指南

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net

在这里插入图片描述

前言

MQTT (Message Queuing Telemetry Transport) 是一种极轻量级的发布/订阅消息传输协议,广泛应用于物联网(IoT)、移动应用和车载设备。在智能家居控制、设备状态上报等场景中,APP 往往需要实时接收设备发来的消息。

mqtt_client 是 Dart 生态中最流行的 MQTT 客户端库,支持 MQTT 3.1 和 3.1.1 协议。它能够在 OpenHarmony 应用中稳定运行,帮助开发者轻松构建物联网控制端。

一、概念介绍/原理解析

1.1 基础概念

  • Broker (代理): 消息的转发服务器(如 EMQX, Mosquitto)。
  • Topic (主题): 消息的分类标签(如 home/livingroom/temp)。
  • QoS (服务质量):
    • 0: 最多一次 (Fire and Forget)
    • 1: 至少一次 (Acknowledged delivery)
    • 2: 只有一次 (Exact delivery)

发布: 25°C

转发

订阅主题

呈现

传感器设备

环境主题

MQTT 代理服务器

OpenHarmony 应用

界面显示: 25°C

1.2 进阶概念

在移动端使用 MQTT,通常需要关注断线重连(Keep Alive)和Clean Session(是否接收离线消息)。

二、核心 API/组件详解

2.1 基础用法

创建客户端并连接。

import'package:mqtt_client/mqtt_client.dart';import'package:mqtt_client/mqtt_server_client.dart';// 使用 Server ClientFuture<void>connect()async{final client =MqttServerClient('broker.emqx.io','flutter_client_id'); client.logging(on:false); client.keepAlivePeriod =20;try{await client.connect();print('已连接');}catch(e){print('连接失败: $e'); client.disconnect();}}
在这里插入图片描述

2.2 高级定制

处理连接状态回调和订阅消息。

// 设置回调 client.onConnected =()=>print('Connected'); client.onDisconnected =()=>print('Disconnected'); client.onSubscribed =(topic)=>print('Subscribed to $topic');// 订阅主题 client.subscribe('test/topic',MqttQos.atLeastOnce);// 监听消息 client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>> c){finalMqttPublishMessage recMess = c[0].payload asMqttPublishMessage;final pt =MqttPublishPayload.bytesToStringAsString(recMess.payload.message);print('收到消息: $pt');});
在这里插入图片描述

三、常见应用场景

3.1 场景 1:智能家居控制

用户点击开关,发送控制指令;同时订阅设备状态更新 UI。

// 发送指令:打开客厅灯voidturnOnLight(){final builder =MqttClientPayloadBuilder(); builder.addString('{"state": "ON"}'); client.publishMessage('home/livingroom/light/set',MqttQos.atLeastOnce, builder.payload!);}
在这里插入图片描述

3.2 场景 2:环境数据监测

实时接收温度、湿度传感器上报的数据,并绘制成图表。

client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>> c){finalMqttPublishMessage recMess = c[0].payload asMqttPublishMessage;final pt =MqttPublishPayload.bytesToStringAsString(recMess.payload.message);// 更新图表数据 chartData.add(parseSensorData(pt));});
在这里插入图片描述

3.3 场景 3:实时聊天/通知

虽然 IM 通常用 WebSocket,但在低带宽环境下,MQTT 也是不错的选择。

// 订阅个人消息主题 client.subscribe('users/$myUserId/inbox',MqttQos.exactlyOnce);// 收到消息弹窗 client.updates!.listen((event){// 显示通知栏消息});
在这里插入图片描述

四、OpenHarmony 平台适配

4.1 网络权限

"requestPermissions":[{"name":"ohos.permission.INTERNET"}]

4.2 后台保活

MQTT 需要保持长连接心跳。OpenHarmony 系统可能会在后台冻结应用网络。如果是关键业务,可能需要申请长时任务(Continuous Task)或使用推送服务辅助唤醒。

五、完整示例代码

本示例展示一个简单的 MQTT 控制台,连接公共 Broker,订阅并发送消息。

import'dart:io';import'package:flutter/material.dart';import'package:mqtt_client/mqtt_client.dart';import'package:mqtt_client/mqtt_server_client.dart';voidmain(){runApp(constMaterialApp(home:MqttPage()));}classMqttPageextendsStatefulWidget{constMqttPage({super.key});@overrideState<MqttPage>createState()=>_MqttPageState();}class _MqttPageState extendsState<MqttPage>{// 使用 EMQX 的公共测试服务器final client =MqttServerClient('broker.emqx.io','ohos_client_${DateTime.now().millisecondsSinceEpoch}');String _status ='未连接';finalList<String> _messages =[];finalTextEditingController _msgController =TextEditingController();finalString _topic ='flutter/ohos/test';Future<void>_connect()async{setState(()=> _status ='连接中...'); client.logging(on:true); client.keepAlivePeriod =60; client.onDisconnected =()=>setState(()=> _status ='已断开'); client.onConnected =()=>setState(()=> _status ='已连接');// 设置连接消息final connMess =MqttConnectMessage().withClientIdentifier('ohos_client').withWillTopic('willtopic').withWillMessage('My Will message').startClean().withWillQos(MqttQos.atLeastOnce); client.connectionMessage = connMess;try{await client.connect();}onNoConnectionExceptioncatch(e){// 客户端异常print('Client exception: $e'); client.disconnect();}onSocketExceptioncatch(e){// Socket 异常print('Socket exception: $e'); client.disconnect();}if(client.connectionStatus!.state ==MqttConnectionState.connected){_subscribe();}else{setState(()=> _status ='连接失败'); client.disconnect();}}void_subscribe(){ client.subscribe(_topic,MqttQos.atLeastOnce); client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>> c){finalMqttPublishMessage recMess = c[0].payload asMqttPublishMessage;final payload =MqttPublishPayload.bytesToStringAsString(recMess.payload.message);setState((){ _messages.insert(0,'[收到] $payload');});});}void_publish(){final builder =MqttClientPayloadBuilder(); builder.addString(_msgController.text); client.publishMessage(_topic,MqttQos.exactlyOnce, builder.payload!);setState((){ _messages.insert(0,'[发送] ${_msgController.text}'); _msgController.clear();});}@overridevoiddispose(){ client.disconnect();super.dispose();}@overrideWidgetbuild(BuildContext context){returnScaffold( appBar:AppBar(title:constText('MQTT Client Demo')), body:Column( children:[Container( padding:constEdgeInsets.all(16), color: _status =='已连接'?Colors.green[100]:Colors.red[100], child:Row( mainAxisAlignment:MainAxisAlignment.spaceBetween, children:[Text('状态: $_status', style:constTextStyle(fontWeight:FontWeight.bold)),if(_status =='未连接'|| _status =='已断开'|| _status =='连接失败')ElevatedButton(onPressed: _connect, child:constText('连接'))elseElevatedButton(onPressed: client.disconnect, child:constText('断开')),],),),Padding( padding:constEdgeInsets.all(8.0), child:Row( children:[Expanded( child:TextField( controller: _msgController, decoration:InputDecoration( labelText:'发送消息到 $_topic', border:constOutlineInputBorder(),),),),IconButton(icon:constIcon(Icons.send), onPressed: _status =='已连接'? _publish :null),],),),Expanded( child:ListView.builder( itemCount: _messages.length, itemBuilder:(context, index)=>ListTile( title:Text(_messages[index]), leading: _messages[index].startsWith('[发送]')?constIcon(Icons.arrow_upward, color:Colors.blue):constIcon(Icons.arrow_downward, color:Colors.green),),),),],),);}}
在这里插入图片描述

六、总结

mqtt_client 提供了灵活且强大的 MQTT 能力。

最佳实践

  1. QoS 选择:一般场景使用 QoS 1 即可,QoS 2 开销较大。
  2. 异常捕获:网络环境复杂,务必捕获 SocketException
  3. 资源管理:页面销毁时及时 disconnect,避免内存泄漏。

Read more

Git: filter-repo历史重写工具介绍

文章目录 * `git-filter-repo` 深度指南:安全高效重写 Git 历史 * 一、为什么需要 `git-filter-repo`?核心价值 * 1. 与传统工具对比 * 2. 核心优势 * 二、安装与验证 * 1. 安装方法 * 2. 验证安装 * 三、核心工作原理 * 1. 执行流程 * 2. 关键安全机制 * 四、10 大高频使用场景(含完整命令) * 场景 1:彻底删除目录(用户原始需求) * 场景 2:删除大文件(仓库瘦身) * 场景 3:重写提交者信息(合规需求) * 场景 4:提取子目录为独立仓库(微服务拆分) * 场景 5:合并多个仓库历史 * 场景

By Ne0inhk

Fast-GitHub网络加速完全指南:彻底解决GitHub访问卡顿问题

还在为GitHub加载缓慢、下载龟速而困扰吗?Fast-GitHub作为一款专为国内开发者设计的智能网络加速插件,通过创新的技术手段彻底优化GitHub访问体验。这款工具能够智能识别GitHub资源请求,自动切换到最优网络节点,让你的开发效率实现质的飞跃。 【免费下载链接】Fast-GitHub国内Github下载很慢,用上了这个插件后,下载速度嗖嗖嗖的~! 项目地址: https://gitcode.com/gh_mirrors/fa/Fast-GitHub 🎯 GitHub访问难题的终极解决方案 常见困扰场景: * 紧急项目需要从GitHub拉取依赖包,但网络连接频繁中断 * 重要的开源代码库下载进度长期停滞 * 团队协作时无法及时获取最新代码更新 传统方法对比分析: 加速方式操作复杂度稳定性维护成本系统代理中等一般需要定期调整网络加速服务复杂不稳定额外费用支出 * 系统代理:设置繁琐,影响其他应用运行 * 网络加速服务:配置复杂,稳定性难以保证 * hosts调整:需要手动维护,容易失效 Fast-GitHub采用轻量级插件设计,一键安装即

By Ne0inhk
Answer 开源平台搭建:cpolar 内网穿透服务助力全球用户社区构建

Answer 开源平台搭建:cpolar 内网穿透服务助力全球用户社区构建

文章目录 * 前言 * 1. 本地安装Docker * 2. 本地部署Apache Answer * 2.1 设置语言选择简体中文 * 2.2 配置数据库 * 2.3 创建配置文件 * 2.4 填写基本信息 * 3. 如何使用Apache Answer * 3.1 后台管理 * 3.2 提问与回答 * 3.3 查看主页回答情况 * 4. 公网远程访问本地 Apache Answer * 4.1 内网穿透工具安装 * 4.2 创建远程连接公网地址 * 5. 固定Apache Answer公网地址 前言 在开源社区运营中,问答平台的全球化访问始终面临双重挑战:一方面需要保障数据主权与隐私安全,另一方面要实现低延迟的跨地域访问。Answer

By Ne0inhk
开源模型应用落地-安全合规篇-用户输入价值观判断(四)

开源模型应用落地-安全合规篇-用户输入价值观判断(四)

一、前言     在深度合规功能中,对用户输入内容的价值观判断具有重要意义。这一功能不仅仅是对信息合法性和合规性的简单审核,更是对信息背后隐含的伦理道德和社会责任的深刻洞察。通过对价值观的判断,系统能够识别可能引发不当影响或冲突的内容,从而为用户提供更安全、更和谐的交流环境。这种机制有助于培养用户积极、正向的思维方式,鼓励多元化但和谐的观点交流。     同时,它还能够保护易受伤害群体,防止网络欺凌、仇恨言论或其他形式的负面影响,使整个网络环境更具包容性和合作性。因此,价值观判断不仅提升了技术系统的智能性,还在更广泛的社会层面推动了道德标准的提高和社会文化的进步。     本篇介绍如何将“开源模型应用落地-安全合规篇-用户输入价值观判断(三)”功能集成进AI服务。     前置学习:     开源模型应用落地-安全合规篇-用户输入合规性检测(一)

By Ne0inhk