跳转到内容

MQTT发布与订阅

本节涵盖 ESP32 与 Node-RED 之间的双向 MQTT 通信模式。学完本节后,你将能够:

  • 从 ESP32 发布传感器数据到 MQTT 主题
  • 订阅来自 Node-RED 的控制命令
  • 实现双向通信(ESP32 → Node-RED、Node-RED → ESP32)
  • 为你的项目设计有效的 MQTT 主题结构
  • 已在 ESP32 上配置 MQTT 客户端(01-11)
  • 基础草图架构(01-12)
  • 已配置 Node-RED MQTT 节点(或使用 MQTT Explorer 进行测试)

MQTT 通信有两个方向:

ESP32 → MQTT Broker → Node-RED: "我有新的传感器数据"
(ESP32 发布,Node-RED 订阅)
Node-RED → MQTT Broker → ESP32: "打开 LED"
(Node-RED 发布,ESP32 订阅)

这种模式将发送方和接收方解耦——它们无需知道彼此的地址,甚至无需同时在线。

ESP32 发布其状态或传感器读取数据。MQTT 主题反映数据内容并遵循层级结构:

主题载荷频率
factory/room01/temperature25.5每 30 秒
factory/room01/humidity65%每 30 秒
factory/device01/status”online”/“offline”变化时
factory/device01/button”pressed”事件触发

ESP32 订阅携带命令或配置的主题:

主题载荷(示例)效果
esp32/led/command”ON” / “OFF”控制 LED 状态
esp32/sensor/interval”10000”更改传感器间隔
esp32/config{"threshold": 30}更新配置

这是最常见的模式——ESP32 定期发送数据:

#include <WiFi.h>
#include <PubSubClient.h>
WiFiClient wifiClient;
PubSubClient client(wifiClient);
// 用于发布的 MQTT 主题
const char* dataTopic = "esp32/sensor/data";
void publishTemperature(float temperature) {
char payload[50];
snprintf(payload, sizeof(payload), "{\"temperature\": %.1f}", temperature);
if (client.publish(dataTopic, payload)) {
Serial.print("已发布到 ");
Serial.print(dataTopic);
Serial.print(": ");
Serial.println(payload);
} else {
Serial.println("发布失败!");
}
}

在 Node-RED 端,添加一个 MQTT In 节点,配置为订阅 esp32/sensor/data。将其连接到一个 Debug 节点以查看传入数据。

ESP32 在 setup() 中订阅命令主题:

void setup() {
// ... Wi-Fi 连接 ...
client.setServer(mqttServer, 1883);
client.setCallback(callback);
client.connect("esp32-device");
// 订阅命令主题
client.subscribe("esp32/control/led");
client.subscribe("esp32/control/config");
Serial.println("已订阅控制主题");
}
void callback(char* topic, byte* payload, unsigned int length) {
String message = "";
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
String topicStr = String(topic);
if (topicStr == "esp32/control/led") {
if (message == "ON") {
digitalWrite(2, HIGH);
client.publish("esp32/status/led", "ON");
} else if (message == "OFF") {
digitalWrite(2, LOW);
client.publish("esp32/status/led", "OFF");
}
}
if (topicStr == "esp32/control/config") {
// 解析 JSON 配置
// 参见 01-13 JSON 解析
}
}

包含两个方向的完整示例:

#include <WiFi.h>
#include <PubSubClient.h>
// 主题
const char* topicFromESP = "esp32/sensor/data"; // ESP32 → Node-RED
const char* topicToESP = "esp32/control/led"; // Node-RED → ESP32
const char* topicFeedback = "esp32/status/led"; // ESP32 → Node-RED(确认)
WiFiClient wifiClient;
PubSubClient client(wifiClient);
// LED 状态
int ledPin = 2;
bool ledState = false;
// 定时
unsigned long lastPublish = 0;
const unsigned long PUBLISH_INTERVAL = 5000;
void callback(char* topic, byte* payload, unsigned int length) {
String message = "";
for (int i = 0; i < length; i++) message += (char)payload[i];
Serial.printf("收到 [%s]: %s\n", topic, message.c_str());
if (String(topic) == topicToESP) {
if (message == "ON") {
ledState = true;
digitalWrite(ledPin, HIGH);
client.publish(topicFeedback, "LED_ON");
} else if (message == "OFF") {
ledState = false;
digitalWrite(ledPin, LOW);
client.publish(topicFeedback, "LED_OFF");
}
}
}
void setup() {
Serial.begin(115200);
pinMode(ledPin, OUTPUT);
WiFi.begin("SSID", "PASSWORD");
while (WiFi.status() != WL_CONNECTED) delay(500);
client.setServer("192.168.1.100", 1883);
client.setCallback(callback);
while (!client.connect("esp32-bidirectional")) delay(1000);
client.subscribe(topicToESP);
Serial.println("MQTT 双向通信就绪");
}
void loop() {
if (!client.connected()) {
// 重连逻辑
}
client.loop();
// 定期发布传感器数据(ESP32 → Node-RED)
if (millis() - lastPublish >= PUBLISH_INTERVAL) {
lastPublish = millis();
// 模拟传感器数据
char json[100];
snprintf(json, sizeof(json),
"{\"led\":%s,\"uptime\":%lu,\"rssi\":%ld}",
ledState ? "true" : "false",
millis() / 1000,
WiFi.RSSI());
client.publish(topicFromESP, json);
}
}

在 Node-RED 中,创建一个接收 ESP32 数据的流程:

[MQTT In: esp32/sensor/data] → [Function: 解析 JSON] → [Debug]
→ [Function: 显示在仪表板]

MQTT In 节点配置:

  • 服务器:你的 Mosquitto 代理
  • 主题:esp32/sensor/data
  • 输出:auto-detectJSON

Function 节点提取值:

// msg.payload 来自 ESP32 的 JSON
let data = msg.payload;
node.warn(`温度: ${data.temperature}°C`);
node.warn(`湿度: ${data.humidity}%`);
// 转发到仪表板
return msg;

向 ESP32 发送命令:

[Inject: ON] → [MQTT Out: esp32/control/led]
[Inject: OFF] → [MQTT Out: esp32/control/led]

或发送 JSON 命令:

[Inject: {"command":"LED_ON","value":1}] → [MQTT Out: esp32/control/config]

在设置 Node-RED 之前,先用 MQTT Explorer 测试双向通信:

  1. 打开 MQTT Explorer
  2. 连接到代理(与 ESP32 相同的 IP)
  3. esp32/sensor/data 上观察 ESP32 发布的数据
  4. 手动向 esp32/control/led 发布载荷 ON
  5. 观察 ESP32 LED 点亮
  6. 检查 esp32/status/led 上的确认消息
  • ESP32 定期向 MQTT 主题发布数据
  • MQTT Explorer 显示已发布的消息
  • 向控制主题发布 “ON” 点亮 LED
  • 向控制主题发布 “OFF” 熄灭 LED
  • ESP32 发布反馈确认
  • Node-RED 接收并处理 ESP32 数据
  • Node-RED 命令被 ESP32 接收并执行

原因

  • ESP32 发布与 Node-RED 订阅的主题不匹配
  • Node-RED MQTT In 节点配置了错误的代理

解决方案

  1. 验证主题字符串完全一致(区分大小写)
  2. 检查 Node-RED MQTT In 节点是否连接到相同的代理
  3. 使用 MQTT Explorer 作为独立验证工具

原因

  • client.loop() 调用频率不足
  • 未通过 setCallback() 注册回调函数
  • 主题字符串比较失败

解决方案

  1. 确保在每次 loop() 迭代中调用 client.loop()(不要在长延时内部)
  2. 验证已在 setup() 中调用 client.setCallback(callback)
  3. 将两个主题字符串打印到 Serial 以比较确切值
  4. 使用 String(topic) == "your/topic" 进行区分大小写的比较

ESP32 与 Node-RED 之间 LED 状态不同步

Section titled “ESP32 与 Node-RED 之间 LED 状态不同步”

原因:Node-RED 不知道 ESP32 的初始 LED 状态。

解决方案:在 MQTT 连接时立即发布当前状态:

void connectToMQTT() {
if (client.connect("esp32-device")) {
// 发布当前状态
client.publish("esp32/status/led", ledState ? "ON" : "OFF", true);
client.subscribe("esp32/control/led");
}
}

true 参数使其成为保留消息。

  • 层级化设计主题位置/设备/传感器 结构使过滤更容易
  • 对状态使用保留消息client.publish(topic, payload, true) — 代理保留最新值供后来订阅者使用
  • 状态变更时发布确认:执行命令后,向状态主题发布新状态
  • 包含状态心跳:定期发布运行时间或”仍在运行”消息
  • 保持载荷小巧:JSON 没问题,但生产环境应避免不必要的空白
  • 每个项目使用独特的主题结构:参见项目章节(02-14)了解特定主题设计
  1. 双向 MQTT 涉及 ESP32 发布传感器数据和订阅命令
  2. 主题设计遵循层级结构:位置/设备/数据类型
  3. ESP32 定期或事件触发时发布;启动时订阅命令主题
  4. Node-RED 订阅 ESP32 数据主题并向 ESP32 命令主题发布
  5. 在设置 Node-RED 之前使用 MQTT Explorer 测试可简化调试
  6. 始终通过发布反馈消息来确认状态变更