MQTT 控制信号
MQTT 控制信号
本节详细介绍自动投料系统的 MQTT 通信协议设计,包括 Topic 结构、数据格式和通信流程。学习完成后,您将能够:
- 设计清晰的 MQTT Topic 层级结构
- 实现 ESP32 和 Node-RED 之间的请求-响应通信
- 处理通信超时和错误场景
- 为多设备部署设计可扩展的 Topic 命名
Topic Structure
Section titled “Topic Structure”esp32/dosing/├── info # ESP32 → Node-RED: 传感器数据 (上行)├── control # Node-RED → ESP32: 投料指令 (下行)├── command # Node-RED → ESP32: 其他命令├── status # ESP32 → Node-RED: 设备状态├── sleep_record # ESP32 → Node-RED: 睡眠记录├── config # 双向: 远程配置├── alert # ESP32 → Node-RED: 告警└── ack # Node-RED → ESP32: 确认消息Data Flow Sequence
Section titled “Data Flow Sequence”ESP32 Node-RED │ │ │ 连接 MQTT Broker │ │──────────────────────────────>│ │ │ │ 订阅: esp32/dosing/control │ │ 订阅: esp32/dosing/command │ │ │ │ 发布: esp32/dosing/info │ │ {distance:16, level:33} │ │──────────────────────────────>│ │ │ │ [Node-RED 处理中] │ │ - 查询数据库 │ │ - 时间差计算 │ │ - 条件判断 │ │ │ │ 发布: control │ │ 1 (启动投料) │ │<──────────────────────────────│ │ │ │ [ESP32 执行投料] │ │ 发布: esp32/dosing/status │ │ {state:"pumping"} │ │──────────────────────────────>│ │ │ │ [投料完成] │ │ 发布: esp32/dosing/status │ │ {state:"complete"} │ │──────────────────────────────>│ │ │ │ 发布: esp32/dosing/ │ │ sleep_record │ │ {boot:5, total_dosing:3} │ │──────────────────────────────>│ │ │ │ 进入深度睡眠 │ │ [1 小时后重新唤醒] │Message Payload Schemas
Section titled “Message Payload Schemas”1. Info (上行 - 传感器数据)
Section titled “1. Info (上行 - 传感器数据)”{ "topic": "esp32/dosing/info", "payload": { "distance": 16, "level": 33, "container_height": 24, "boot": 5, "total_dosing": 3, "sensor_type": "ultrasonic", "state": "get_data", "timestamp": 1716013825 }, "qos": 1, "retain": false}2. Control (下行 - 投料指令)
Section titled “2. Control (下行 - 投料指令)”{ "topic": "esp32/dosing/control", "payload": "1", "qos": 1, "retain": false}注意: control 消息使用简单字符串 “1” 或 “0”,而非 JSON。ESP32 解析更简单、更快速。
3. Command (下行 - 控制命令)
Section titled “3. Command (下行 - 控制命令)”{ "topic": "esp32/dosing/command", "payload": { "command": "get_data", "params": { "samples": 5 } }, "qos": 1}支持的命令列表:
| 命令 | 说明 | 参数 |
|---|---|---|
get_data | 读取传感器数据 | samples (采样次数) |
calibrate_empty | 空容器校准 | - |
calibrate_full | 满容器校准 | - |
reboot | 重启 ESP32 | - |
set_config | 设置配置 | pump_duration, interval 等 |
4. Status (上行 - 设备状态)
Section titled “4. Status (上行 - 设备状态)”{ "topic": "esp32/dosing/status", "payload": { "state": "pumping", "state_code": 2, "message": "Dosing in progress (3s remaining)", "rssi": -65, "heap_free": 128000 }, "qos": 0}状态枚举:
| state | state_code | 说明 |
|---|---|---|
| idle | 0 | 空闲等待 |
| get_data | 1 | 获取数据中 |
| pumping | 2 | 投料中 |
| complete | 3 | 投料完成 |
| sleep | 4 | 进入睡眠 |
| error | 99 | 错误状态 |
5. Sleep Record (上行 - 睡眠记录)
Section titled “5. Sleep Record (上行 - 睡眠记录)”{ "topic": "esp32/dosing/sleep_record", "payload": { "boot": 5, "total_dosing": 3, "last_error": "", "uptime": 45000, "free_heap": 128000 }, "qos": 1}6. Config (双向 - 配置)
Section titled “6. Config (双向 - 配置)”{ "topic": "esp32/dosing/config", "payload": { "pump_duration_ms": 3000, "wifi_timeout_s": 20, "mqtt_retry_interval_s": 5, "sensor_samples": 5, "deep_sleep_hours": 1 }, "qos": 1, "retain": true}ESP32 MQTT Implementation
Section titled “ESP32 MQTT Implementation”// MQTT 消息处理void setupMQTT() { client.setServer(mqtt_server, mqtt_port); client.setBufferSize(512); // 足够 JSON 负载 client.setCallback(callback);}
void connectMQTT() { while (!client.connected()) { if (client.connect(client_id, mqtt_user, mqtt_pass)) { // 订阅控制 Topic client.subscribe("esp32/dosing/control"); client.subscribe("esp32/dosing/command"); client.subscribe("esp32/dosing/config");
// 发布上线状态 client.publish("esp32/dosing/status", "{\"state\":\"online\"}", true); } else { delay(5000); } }}
// 发布 Info (传感器数据)void publishInfo(long distance, int level) { String payload = "{"; payload += "\"distance\":" + String(distance) + ","; payload += "\"level\":" + String(level) + ","; payload += "\"boot\":" + String(bootCount) + ","; payload += "\"total_dosing\":" + String(totalDosingCount); payload += "}";
client.publish("esp32/dosing/info", payload.c_str());}
// 发布 Statusvoid publishStatus(const char* status, const char* message) { String payload = "{"; payload += "\"state\":\"" + String(status) + "\","; payload += "\"message\":\"" + String(message) + "\","; payload += "\"rssi\":" + String(WiFi.RSSI()); payload += "}";
client.publish("esp32/dosing/status", payload.c_str());}
// MQTT 回调void callback(char* topic, byte* payload, unsigned int length) { String topicStr = String(topic); String message;
for (int i = 0; i < length; i++) { message += (char)payload[i]; }
// 投料控制指令 if (topicStr == "esp32/dosing/control") { if (message == "1") { currentState = STATE_PUMPING; } else { currentState = STATE_TIME_FOR_SLEEP; } }
// 命令处理 if (topicStr == "esp32/dosing/command") { if (message == "get_data") { currentState = STATE_GET_DATA; } else if (message == "reboot") { ESP.restart(); } }
// 配置更新 if (topicStr == "esp32/dosing/config") { // 解析 JSON 配置并更新 // ... }}Multi-Device Topic Naming
Section titled “Multi-Device Topic Naming”当部署多个投料设备时,使用设备 ID 隔离 Topic:
esp32/dosing/{device_id}/├── info # esp32/dosing/device_01/info├── control # esp32/dosing/device_01/control├── status # esp32/dosing/device_01/status└── sleep_record # esp32/dosing/device_01/sleep_record// 设备 ID 配置const char* device_id = "doser_01";const char* topic_info = "esp32/dosing/doser_01/info";const char* topic_control = "esp32/dosing/doser_01/control";
// 订阅时使用设备 IDclient.subscribe(topic_control);# 1. 监控所有 Topicmosquitto_sub -t "esp32/dosing/#" -v
# 2. 模拟 ESP32 发送数据mosquitto_pub -t "esp32/dosing/info" \ -m '{"distance":16,"level":33,"boot":5}'
# 3. 发送控制指令mosquitto_pub -t "esp32/dosing/control" -m "1"
# 4. 发送命令mosquitto_pub -t "esp32/dosing/command" -m "get_data"
# 5. 测试 retain 消息 (配置持久化)mosquitto_pub -t "esp32/dosing/config" -r \ -m '{"pump_duration_ms":5000}'Common Customer Questions
Section titled “Common Customer Questions”Q1: 为什么 control 使用简单字符串而非 JSON?
Section titled “Q1: 为什么 control 使用简单字符串而非 JSON?”ESP32 资源有限,解析简单字符串比解析 JSON 快得多,且不会因 JSON 解析失败导致误操作。传感器数据等复杂结构才使用 JSON。
Q2: 多个投料设备如何共享同一个 Node-RED 流程?
Section titled “Q2: 多个投料设备如何共享同一个 Node-RED 流程?”使用 MQTT 通配符订阅和动态 Topic 解析:
// Node-RED: MQTT In 订阅 "esp32/dosing/+/info"// 使用 Topic 中的通配符捕获设备 IDvar topic = msg.topic;var deviceId = topic.split('/')[2]; // doser_01
// 使用 deviceId 区分数据源flow.set(deviceId + "_lastData", msg.payload);Q3: Qos 级别如何选择?
Section titled “Q3: Qos 级别如何选择?”| Topic | QoS | 理由 |
|---|---|---|
| info | 1 | 传感器数据需要确保送达 |
| control | 1 | 投料指令必须执行 |
| status | 0 | 状态变化频繁,丢失可接受 |
| command | 1 | 关键命令确保执行 |
| config | 1 + retain | 配置需要持久化 |
✅ 推荐做法:
- Topic 使用层级结构,便于使用通配符
- 控制指令使用简单字符串(“1”/“0”),避免解析开销
- 传感器数据使用 JSON 结构,便于扩展
- 配置消息使用 retain 标志,设备上线即可获取最新配置
- 多设备部署时使用设备 ID 隔离 Topic
❌ 避免做法:
- 所有消息使用 QoS 2(过于重量级,增加延迟)
- 设备 ID 硬编码在 Topic 字符串中而非使用变量
- 控制指令使用复杂 JSON 格式
- 忽略 retain 消息的清理(配置变更后需清除旧 retain)
Summary
Section titled “Summary”- Topic 层级:
esp32/dosing/{category}清晰划分上行/下行 - 请求-响应: ESP32 发布 info → 等待 control 指令
- 消息格式: 控制指令用字符串,数据用 JSON
- QoS 策略: 关键消息 QoS 1,状态消息 QoS 0
- 多设备扩展: 使用设备 ID 隔离 Topic 空间