Flutter for OpenHarmony:mqtt_client 连接 MQTT 代理,实现物联网(IoT)设备实时状态监控(轻量级发布订阅协议) 深度解析与鸿蒙适配指南
欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net

前言
MQTT (Message Queuing Telemetry Transport) 是一种极轻量级的发布/订阅消息传输协议,广泛应用于物联网(IoT)、移动应用和车载设备。在智能家居控制、设备状态上报等场景中,APP 往往需要实时接收设备发来的消息。
mqtt_client 是 Dart 生态中最流行的 MQTT 客户端库,支持 MQTT 3.1 和 3.1.1 协议。它能够在 OpenHarmony 应用中稳定运行,帮助开发者轻松构建物联网控制端。
一、概念介绍/原理解析
1.1 基础概念
- Broker (代理): 消息的转发服务器(如 EMQX, Mosquitto)。
- Topic (主题): 消息的分类标签(如
home/livingroom/temp)。 - QoS (服务质量):
- 0: 最多一次 (Fire and Forget)
- 1: 至少一次 (Acknowledged delivery)
- 2: 只有一次 (Exact delivery)
发布: 25°C
转发
订阅主题
呈现
传感器设备
环境主题
MQTT 代理服务器
OpenHarmony 应用
界面显示: 25°C
1.2 进阶概念
在移动端使用 MQTT,通常需要关注断线重连(Keep Alive)和Clean Session(是否接收离线消息)。
二、核心 API/组件详解
2.1 基础用法
创建客户端并连接。
import'package:mqtt_client/mqtt_client.dart';import'package:mqtt_client/mqtt_server_client.dart';// 使用 Server ClientFuture<void>connect()async{final client =MqttServerClient('broker.emqx.io','flutter_client_id'); client.logging(on:false); client.keepAlivePeriod =20;try{await client.connect();print('已连接');}catch(e){print('连接失败: $e'); client.disconnect();}}
2.2 高级定制
处理连接状态回调和订阅消息。
// 设置回调 client.onConnected =()=>print('Connected'); client.onDisconnected =()=>print('Disconnected'); client.onSubscribed =(topic)=>print('Subscribed to $topic');// 订阅主题 client.subscribe('test/topic',MqttQos.atLeastOnce);// 监听消息 client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>> c){finalMqttPublishMessage recMess = c[0].payload asMqttPublishMessage;final pt =MqttPublishPayload.bytesToStringAsString(recMess.payload.message);print('收到消息: $pt');});
三、常见应用场景
3.1 场景 1:智能家居控制
用户点击开关,发送控制指令;同时订阅设备状态更新 UI。
// 发送指令:打开客厅灯voidturnOnLight(){final builder =MqttClientPayloadBuilder(); builder.addString('{"state": "ON"}'); client.publishMessage('home/livingroom/light/set',MqttQos.atLeastOnce, builder.payload!);}
3.2 场景 2:环境数据监测
实时接收温度、湿度传感器上报的数据,并绘制成图表。
client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>> c){finalMqttPublishMessage recMess = c[0].payload asMqttPublishMessage;final pt =MqttPublishPayload.bytesToStringAsString(recMess.payload.message);// 更新图表数据 chartData.add(parseSensorData(pt));});
3.3 场景 3:实时聊天/通知
虽然 IM 通常用 WebSocket,但在低带宽环境下,MQTT 也是不错的选择。
// 订阅个人消息主题 client.subscribe('users/$myUserId/inbox',MqttQos.exactlyOnce);// 收到消息弹窗 client.updates!.listen((event){// 显示通知栏消息});
四、OpenHarmony 平台适配
4.1 网络权限
"requestPermissions":[{"name":"ohos.permission.INTERNET"}]4.2 后台保活
MQTT 需要保持长连接心跳。OpenHarmony 系统可能会在后台冻结应用网络。如果是关键业务,可能需要申请长时任务(Continuous Task)或使用推送服务辅助唤醒。
五、完整示例代码
本示例展示一个简单的 MQTT 控制台,连接公共 Broker,订阅并发送消息。
import'dart:io';import'package:flutter/material.dart';import'package:mqtt_client/mqtt_client.dart';import'package:mqtt_client/mqtt_server_client.dart';voidmain(){runApp(constMaterialApp(home:MqttPage()));}classMqttPageextendsStatefulWidget{constMqttPage({super.key});@overrideState<MqttPage>createState()=>_MqttPageState();}class _MqttPageState extendsState<MqttPage>{// 使用 EMQX 的公共测试服务器final client =MqttServerClient('broker.emqx.io','ohos_client_${DateTime.now().millisecondsSinceEpoch}');String _status ='未连接';finalList<String> _messages =[];finalTextEditingController _msgController =TextEditingController();finalString _topic ='flutter/ohos/test';Future<void>_connect()async{setState(()=> _status ='连接中...'); client.logging(on:true); client.keepAlivePeriod =60; client.onDisconnected =()=>setState(()=> _status ='已断开'); client.onConnected =()=>setState(()=> _status ='已连接');// 设置连接消息final connMess =MqttConnectMessage().withClientIdentifier('ohos_client').withWillTopic('willtopic').withWillMessage('My Will message').startClean().withWillQos(MqttQos.atLeastOnce); client.connectionMessage = connMess;try{await client.connect();}onNoConnectionExceptioncatch(e){// 客户端异常print('Client exception: $e'); client.disconnect();}onSocketExceptioncatch(e){// Socket 异常print('Socket exception: $e'); client.disconnect();}if(client.connectionStatus!.state ==MqttConnectionState.connected){_subscribe();}else{setState(()=> _status ='连接失败'); client.disconnect();}}void_subscribe(){ client.subscribe(_topic,MqttQos.atLeastOnce); client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>> c){finalMqttPublishMessage recMess = c[0].payload asMqttPublishMessage;final payload =MqttPublishPayload.bytesToStringAsString(recMess.payload.message);setState((){ _messages.insert(0,'[收到] $payload');});});}void_publish(){final builder =MqttClientPayloadBuilder(); builder.addString(_msgController.text); client.publishMessage(_topic,MqttQos.exactlyOnce, builder.payload!);setState((){ _messages.insert(0,'[发送] ${_msgController.text}'); _msgController.clear();});}@overridevoiddispose(){ client.disconnect();super.dispose();}@overrideWidgetbuild(BuildContext context){returnScaffold( appBar:AppBar(title:constText('MQTT Client Demo')), body:Column( children:[Container( padding:constEdgeInsets.all(16), color: _status =='已连接'?Colors.green[100]:Colors.red[100], child:Row( mainAxisAlignment:MainAxisAlignment.spaceBetween, children:[Text('状态: $_status', style:constTextStyle(fontWeight:FontWeight.bold)),if(_status =='未连接'|| _status =='已断开'|| _status =='连接失败')ElevatedButton(onPressed: _connect, child:constText('连接'))elseElevatedButton(onPressed: client.disconnect, child:constText('断开')),],),),Padding( padding:constEdgeInsets.all(8.0), child:Row( children:[Expanded( child:TextField( controller: _msgController, decoration:InputDecoration( labelText:'发送消息到 $_topic', border:constOutlineInputBorder(),),),),IconButton(icon:constIcon(Icons.send), onPressed: _status =='已连接'? _publish :null),],),),Expanded( child:ListView.builder( itemCount: _messages.length, itemBuilder:(context, index)=>ListTile( title:Text(_messages[index]), leading: _messages[index].startsWith('[发送]')?constIcon(Icons.arrow_upward, color:Colors.blue):constIcon(Icons.arrow_downward, color:Colors.green),),),),],),);}}
六、总结
mqtt_client 提供了灵活且强大的 MQTT 能力。
最佳实践:
- QoS 选择:一般场景使用 QoS 1 即可,QoS 2 开销较大。
- 异常捕获:网络环境复杂,务必捕获
SocketException。 - 资源管理:页面销毁时及时
disconnect,避免内存泄漏。