Python PyModbus 模块详解
一、详解
pymodbus 是一个基于 Python 的开源库,用于实现 Modbus 通信协议。Modbus 是一种广泛应用于工业自动化领域的串行通信协议,支持主从架构(客户端 - 服务器模型)。pymodbus 库提供了完整的 Modbus 客户端(主站)和服务器(从站)功能,支持多种传输方式,包括 Modbus RTU(串行通信)和 Modbus TCP(以太网通信)。
Python 中的 pymodbus 开源库,涵盖 Modbus 协议基础概念(如寄存器类型、功能码)、核心功能(客户端/服务器、同步/异步模式)、安装配置及多场景代码示例(TCP/RTU)。文章还包含高级特性说明、性能优化建议及常见错误处理方案,旨在帮助开发者快速掌握工业通信协议的 Python 实现。

pymodbus 是一个基于 Python 的开源库,用于实现 Modbus 通信协议。Modbus 是一种广泛应用于工业自动化领域的串行通信协议,支持主从架构(客户端 - 服务器模型)。pymodbus 库提供了完整的 Modbus 客户端(主站)和服务器(从站)功能,支持多种传输方式,包括 Modbus RTU(串行通信)和 Modbus TCP(以太网通信)。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
Modbus 协议定义了一套标准的数据模型和通信规则:
pymodbus 库实现了这些概念,支持 Python 3.6 及以上版本,并兼容多种操作系统(Windows、Linux、macOS)。
pymodbus 提供模块化设计,主要组件包括:
read_coils、write_register。ModbusServerContext)。关键优势:
pymodbus.client)和异步(pymodbus.async)编程。安装 pymodbus 简单,通过 pip 命令即可:
pip install pymodbus
安装后,导入所需模块:
from pymodbus.client import ModbusTcpClient # TCP 客户端
from pymodbus.server import StartTcpServer # TCP 服务器
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext # 数据存储
环境要求:
以下示例展示常见场景,代码基于同步模式(异步模式类似,使用 asyncio)。
示例 1: Modbus TCP 客户端(读取保持寄存器)
作为客户端,连接到服务器并读取数据。
from pymodbus.client import ModbusTcpClient
# 连接到服务器(假设 IP: 127.0.0.1, 端口: 502)
client = ModbusTcpClient('127.0.0.1', port=502)
client.connect()
# 读取保持寄存器(地址 0,数量 2 个寄存器)
result = client.read_holding_registers(address=0, count=2, unit=1)
# unit 为从站 ID
if not result.isError():
print("读取成功:", result.registers)
# 输出寄存器值列表,如 [100, 200]
else:
print("错误:", result)
client.close() # 关闭连接
示例 2: Modbus TCP 服务器(模拟从站)
创建服务器并存储数据。
from pymodbus.server import StartTcpServer
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.datastore import ModbusSequentialDataBlock
# 初始化数据存储:设置保持寄存器(地址 0-9,初始值 0)
store = ModbusSlaveContext(
hr=ModbusSequentialDataBlock(0, [0]*10) # hr 表示保持寄存器
)
context = ModbusServerContext(slaves=store, single=True)
# single 模式为单从站
# 启动服务器(端口 502)
StartTcpServer(context=context, address=("0.0.0.0", 502))
示例 3: Modbus RTU 通信(使用串口)
适用于串行设备。
from pymodbus.client import ModbusSerialClient
# 配置串口(端口 COM1, 波特率 9600)
client = ModbusSerialClient(method='rtu', port='COM1', baudrate=9600)
client.connect()
# 写单个线圈(地址 0,值 True)
client.write_coil(address=0, value=True, unit=1)
client.close()
pymodbus 支持更复杂场景:
ModbusRequestHandler 处理特定逻辑。import logging),便于调试通信问题。异步模式:使用 AsyncModbusTcpClient 结合 asyncio,提高并发性能。
import asyncio
from pymodbus.client import AsyncModbusTcpClient
async def main():
client = AsyncModbusTcpClient('127.0.0.1', port=502)
await client.connect()
result = await client.read_holding_registers(0, 2)
print(result.registers)
await client.close()
asyncio.run(main())
性能优化建议:
read_holding_registers 中 count 参数)。错误处理:pymodbus 可能抛出异常(如 ModbusIOException),需捕获处理:
try:
result = client.read_coils(0, 1)
except Exception as e:
print("通信错误:", e)
from time import sleep
from pymodbus.client import ModbusTcpClient
client = ModbusTcpClient('127.0.0.1')
client.connect()
while True:
res = client.read_holding_registers(slave=1, address=0, count=10)
print(res.registers)
sleep(0.5)
from time import sleep
from pymodbus.client import ModbusSerialClient
client = ModbusSerialClient(method='rtu', port='COM1', baudrate=9600, timeout=1)
connection = client.connect()
while True:
res = client.read_holding_registers(slave=1, address=0, count=10)
print(res.registers)
sleep(0.5)