跳转到内容

MQTT 控制信号

MQTT 控制信号

本节详细介绍自动投料系统的 MQTT 通信协议设计,包括 Topic 结构、数据格式和通信流程。学习完成后,您将能够:

  • 设计清晰的 MQTT Topic 层级结构
  • 实现 ESP32 和 Node-RED 之间的请求-响应通信
  • 处理通信超时和错误场景
  • 为多设备部署设计可扩展的 Topic 命名
esp32/dosing/
├── info # ESP32 → Node-RED: 传感器数据 (上行)
├── control # Node-RED → ESP32: 投料指令 (下行)
├── command # Node-RED → ESP32: 其他命令
├── status # ESP32 → Node-RED: 设备状态
├── sleep_record # ESP32 → Node-RED: 睡眠记录
├── config # 双向: 远程配置
├── alert # ESP32 → Node-RED: 告警
└── ack # Node-RED → ESP32: 确认消息
ESP32 Node-RED
│ │
│ 连接 MQTT Broker │
│──────────────────────────────>│
│ │
│ 订阅: esp32/dosing/control │
│ 订阅: esp32/dosing/command │
│ │
│ 发布: esp32/dosing/info │
│ {distance:16, level:33} │
│──────────────────────────────>│
│ │
│ [Node-RED 处理中] │
│ - 查询数据库 │
│ - 时间差计算 │
│ - 条件判断 │
│ │
│ 发布: control │
│ 1 (启动投料) │
│<──────────────────────────────│
│ │
│ [ESP32 执行投料] │
│ 发布: esp32/dosing/status │
│ {state:"pumping"} │
│──────────────────────────────>│
│ │
│ [投料完成] │
│ 发布: esp32/dosing/status │
│ {state:"complete"} │
│──────────────────────────────>│
│ │
│ 发布: esp32/dosing/ │
│ sleep_record │
│ {boot:5, total_dosing:3} │
│──────────────────────────────>│
│ │
│ 进入深度睡眠 │
│ [1 小时后重新唤醒] │
{
"topic": "esp32/dosing/info",
"payload": {
"distance": 16,
"level": 33,
"container_height": 24,
"boot": 5,
"total_dosing": 3,
"sensor_type": "ultrasonic",
"state": "get_data",
"timestamp": 1716013825
},
"qos": 1,
"retain": false
}
{
"topic": "esp32/dosing/control",
"payload": "1",
"qos": 1,
"retain": false
}

注意: control 消息使用简单字符串 “1” 或 “0”,而非 JSON。ESP32 解析更简单、更快速。

{
"topic": "esp32/dosing/command",
"payload": {
"command": "get_data",
"params": {
"samples": 5
}
},
"qos": 1
}

支持的命令列表:

命令说明参数
get_data读取传感器数据samples (采样次数)
calibrate_empty空容器校准-
calibrate_full满容器校准-
reboot重启 ESP32-
set_config设置配置pump_duration, interval 等
{
"topic": "esp32/dosing/status",
"payload": {
"state": "pumping",
"state_code": 2,
"message": "Dosing in progress (3s remaining)",
"rssi": -65,
"heap_free": 128000
},
"qos": 0
}

状态枚举:

statestate_code说明
idle0空闲等待
get_data1获取数据中
pumping2投料中
complete3投料完成
sleep4进入睡眠
error99错误状态
{
"topic": "esp32/dosing/sleep_record",
"payload": {
"boot": 5,
"total_dosing": 3,
"last_error": "",
"uptime": 45000,
"free_heap": 128000
},
"qos": 1
}
{
"topic": "esp32/dosing/config",
"payload": {
"pump_duration_ms": 3000,
"wifi_timeout_s": 20,
"mqtt_retry_interval_s": 5,
"sensor_samples": 5,
"deep_sleep_hours": 1
},
"qos": 1,
"retain": true
}
// MQTT 消息处理
void setupMQTT() {
client.setServer(mqtt_server, mqtt_port);
client.setBufferSize(512); // 足够 JSON 负载
client.setCallback(callback);
}
void connectMQTT() {
while (!client.connected()) {
if (client.connect(client_id, mqtt_user, mqtt_pass)) {
// 订阅控制 Topic
client.subscribe("esp32/dosing/control");
client.subscribe("esp32/dosing/command");
client.subscribe("esp32/dosing/config");
// 发布上线状态
client.publish("esp32/dosing/status",
"{\"state\":\"online\"}", true);
} else {
delay(5000);
}
}
}
// 发布 Info (传感器数据)
void publishInfo(long distance, int level) {
String payload = "{";
payload += "\"distance\":" + String(distance) + ",";
payload += "\"level\":" + String(level) + ",";
payload += "\"boot\":" + String(bootCount) + ",";
payload += "\"total_dosing\":" + String(totalDosingCount);
payload += "}";
client.publish("esp32/dosing/info", payload.c_str());
}
// 发布 Status
void publishStatus(const char* status, const char* message) {
String payload = "{";
payload += "\"state\":\"" + String(status) + "\",";
payload += "\"message\":\"" + String(message) + "\",";
payload += "\"rssi\":" + String(WiFi.RSSI());
payload += "}";
client.publish("esp32/dosing/status", payload.c_str());
}
// MQTT 回调
void callback(char* topic, byte* payload, unsigned int length) {
String topicStr = String(topic);
String message;
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
// 投料控制指令
if (topicStr == "esp32/dosing/control") {
if (message == "1") {
currentState = STATE_PUMPING;
} else {
currentState = STATE_TIME_FOR_SLEEP;
}
}
// 命令处理
if (topicStr == "esp32/dosing/command") {
if (message == "get_data") {
currentState = STATE_GET_DATA;
} else if (message == "reboot") {
ESP.restart();
}
}
// 配置更新
if (topicStr == "esp32/dosing/config") {
// 解析 JSON 配置并更新
// ...
}
}

当部署多个投料设备时,使用设备 ID 隔离 Topic:

esp32/dosing/{device_id}/
├── info # esp32/dosing/device_01/info
├── control # esp32/dosing/device_01/control
├── status # esp32/dosing/device_01/status
└── sleep_record # esp32/dosing/device_01/sleep_record
// 设备 ID 配置
const char* device_id = "doser_01";
const char* topic_info = "esp32/dosing/doser_01/info";
const char* topic_control = "esp32/dosing/doser_01/control";
// 订阅时使用设备 ID
client.subscribe(topic_control);
Terminal window
# 1. 监控所有 Topic
mosquitto_sub -t "esp32/dosing/#" -v
# 2. 模拟 ESP32 发送数据
mosquitto_pub -t "esp32/dosing/info" \
-m '{"distance":16,"level":33,"boot":5}'
# 3. 发送控制指令
mosquitto_pub -t "esp32/dosing/control" -m "1"
# 4. 发送命令
mosquitto_pub -t "esp32/dosing/command" -m "get_data"
# 5. 测试 retain 消息 (配置持久化)
mosquitto_pub -t "esp32/dosing/config" -r \
-m '{"pump_duration_ms":5000}'

Q1: 为什么 control 使用简单字符串而非 JSON?

Section titled “Q1: 为什么 control 使用简单字符串而非 JSON?”

ESP32 资源有限,解析简单字符串比解析 JSON 快得多,且不会因 JSON 解析失败导致误操作。传感器数据等复杂结构才使用 JSON。

Q2: 多个投料设备如何共享同一个 Node-RED 流程?

Section titled “Q2: 多个投料设备如何共享同一个 Node-RED 流程?”

使用 MQTT 通配符订阅和动态 Topic 解析:

// Node-RED: MQTT In 订阅 "esp32/dosing/+/info"
// 使用 Topic 中的通配符捕获设备 ID
var topic = msg.topic;
var deviceId = topic.split('/')[2]; // doser_01
// 使用 deviceId 区分数据源
flow.set(deviceId + "_lastData", msg.payload);
TopicQoS理由
info1传感器数据需要确保送达
control1投料指令必须执行
status0状态变化频繁,丢失可接受
command1关键命令确保执行
config1 + retain配置需要持久化

推荐做法:

  • Topic 使用层级结构,便于使用通配符
  • 控制指令使用简单字符串(“1”/“0”),避免解析开销
  • 传感器数据使用 JSON 结构,便于扩展
  • 配置消息使用 retain 标志,设备上线即可获取最新配置
  • 多设备部署时使用设备 ID 隔离 Topic

避免做法:

  • 所有消息使用 QoS 2(过于重量级,增加延迟)
  • 设备 ID 硬编码在 Topic 字符串中而非使用变量
  • 控制指令使用复杂 JSON 格式
  • 忽略 retain 消息的清理(配置变更后需清除旧 retain)
  1. Topic 层级: esp32/dosing/{category} 清晰划分上行/下行
  2. 请求-响应: ESP32 发布 info → 等待 control 指令
  3. 消息格式: 控制指令用字符串,数据用 JSON
  4. QoS 策略: 关键消息 QoS 1,状态消息 QoS 0
  5. 多设备扩展: 使用设备 ID 隔离 Topic 空间