MQTT 数据传输
MQTT 数据传输
本节介绍如何将 ESP32 采集的传感器数据通过 MQTT 协议发送到服务器端。学习完成后,您将能够:
- 在 ESP32 上配置 MQTT 客户端连接
- 实现 JSON 数据的 MQTT 发布
- 实现 MQTT 订阅和命令接收
- 处理 MQTT 断线重连
在开始本节之前,请确保:
- 已完成 MQTT 协议基础学习
- 已完成传感器数据读取逻辑
- Mosquitto 或 EMQX Broker 已运行
- MQTT Explorer 或 mosquitto_sub 工具已就绪
MQTT Client Setup
Section titled “MQTT Client Setup”Required Library
Section titled “Required Library”ESP32 上使用 PubSubClient 库实现 MQTT 通信:
Arduino IDE 安装:
工具 → 管理库 → 搜索 "PubSubClient" by Nick O'Leary → 安装PlatformIO 配置:
; platformio.inilib_deps = knolleary/PubSubClient@^2.8 bblanchon/ArduinoJson@^6.21WiFi + MQTT Connection Manager
Section titled “WiFi + MQTT Connection Manager”// wifi_mqtt.h - WiFi 和 MQTT 连接管理头文件#ifndef WIFI_MQTT_H#define WIFI_MQTT_H
#include <WiFi.h>#include <PubSubClient.h>
extern WiFiClient espClient;extern PubSubClient mqttClient;
// 连接状态回调extern void (*onMqttConnected)();extern void (*onMqttMessage)(String topic, String payload);
bool connectWiFi();bool connectMQTT();void checkConnections();void mqttCallback(char* topic, byte* payload, unsigned int length);
#endif// wifi_mqtt.cpp - WiFi 和 MQTT 连接管理实现#include "wifi_mqtt.h"#include "config.h"
WiFiClient espClient;PubSubClient mqttClient(espClient);
// 回调函数指针void (*onMqttConnected)() = NULL;void (*onMqttMessage)(String topic, String payload) = NULL;
// WiFi 连接状态unsigned long lastWifiRetry = 0;const unsigned long WIFI_RETRY_INTERVAL = 10000; // 10 秒重试
// MQTT 连接状态unsigned long lastMqttRetry = 0;const unsigned long MQTT_RETRY_INTERVAL = 5000; // 5 秒重试
bool connectWiFi() { if (WiFi.status() == WL_CONNECTED) { return true; }
Serial.print("Connecting to WiFi: "); Serial.println(WIFI_SSID);
WiFi.mode(WIFI_STA); WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
int attempts = 0; while (WiFi.status() != WL_CONNECTED && attempts < 20) { delay(500); Serial.print("."); attempts++; }
if (WiFi.status() == WL_CONNECTED) { Serial.println("\nWiFi connected!"); Serial.print("IP Address: "); Serial.println(WiFi.localIP()); return true; } else { Serial.println("\nWiFi connection failed!"); return false; }}
bool connectMQTT() { if (mqttClient.connected()) { return true; }
Serial.print("Connecting to MQTT Broker: "); Serial.println(MQTT_BROKER);
mqttClient.setServer(MQTT_BROKER, MQTT_PORT); mqttClient.setCallback(mqttCallback);
// 尝试连接(使用 Last Will 遗嘱消息) String willTopic = String(MQTT_TOPIC) + "/status"; String willMessage = "{\"device\":\"" + String(DEVICE_ID) + "\",\"status\":\"offline\"}";
if (mqttClient.connect( MQTT_CLIENT_ID, MQTT_USER, MQTT_PASSWORD, willTopic.c_str(), 0, // QoS true, // retain willMessage.c_str() )) { Serial.println("MQTT connected!");
// 发布上线消息 String onlineMsg = "{\"device\":\"" + String(DEVICE_ID) + "\",\"status\":\"online\"}"; mqttClient.publish(willTopic.c_str(), onlineMsg.c_str(), true);
// 订阅控制 Topic String controlTopic = String(MQTT_TOPIC) + "/control"; mqttClient.subscribe(controlTopic.c_str()); Serial.print("Subscribed to: "); Serial.println(controlTopic);
// 触发连接回调 if (onMqttConnected != NULL) { onMqttConnected(); }
return true; } else { Serial.print("MQTT connection failed, state: "); Serial.println(mqttClient.state()); return false; }}
void mqttCallback(char* topic, byte* payload, unsigned int length) { // 将 payload 转换为 String String message; for (unsigned int i = 0; i < length; i++) { message += (char)payload[i]; }
String topicStr = String(topic);
Serial.print("MQTT received ["); Serial.print(topicStr); Serial.print("]: "); Serial.println(message);
// 触发消息回调 if (onMqttMessage != NULL) { onMqttMessage(topicStr, message); }}
void checkConnections() { unsigned long now = millis();
// WiFi 断线重连 if (WiFi.status() != WL_CONNECTED) { if (now - lastWifiRetry > WIFI_RETRY_INTERVAL) { lastWifiRetry = now; connectWiFi(); } }
// MQTT 断线重连 if (WiFi.status() == WL_CONNECTED && !mqttClient.connected()) { if (now - lastMqttRetry > MQTT_RETRY_INTERVAL) { lastMqttRetry = now; connectMQTT(); } }
// 维护 MQTT 连接 if (mqttClient.connected()) { mqttClient.loop(); }}MQTT Data Publishing
Section titled “MQTT Data Publishing”JSON Data Publication
Section titled “JSON Data Publication”// 使用 ArduinoJson 构建和发布数据#include <ArduinoJson.h>#include "config.h"#include "wifi_mqtt.h"
// 发布传感器数据到 MQTTvoid publishSensorData(float temperature, float humidity, float lux) { if (!mqttClient.connected()) { Serial.println("MQTT not connected, skipping publish"); return; }
// 创建 JSON 文档 StaticJsonDocument<256> doc;
doc["device_id"] = DEVICE_ID; doc["location"] = DEVICE_LOCATION; doc["timestamp"] = millis() / 1000; // 相对时间戳
// 温度 JsonObject temp = doc.createNestedObject("temperature"); temp["value"] = temperature; temp["unit"] = "celsius";
// 湿度 JsonObject hum = doc.createNestedObject("humidity"); hum["value"] = humidity; hum["unit"] = "percent";
// 光照 JsonObject light = doc.createNestedObject("lux"); light["value"] = lux; light["unit"] = "lux";
// 信号质量 JsonObject quality = doc.createNestedObject("quality"); quality["signal_rssi"] = WiFi.RSSI(); quality["wifi_strength"] = map(WiFi.RSSI(), -100, -30, 0, 100);
// 序列化 JSON char buffer[256]; size_t n = serializeJson(doc, buffer);
// 发布到 MQTT if (mqttClient.publish(MQTT_TOPIC, buffer, true)) { Serial.print("Published to "); Serial.print(MQTT_TOPIC); Serial.print(": "); Serial.println(buffer); } else { Serial.println("Publish failed!"); }}Individual Topic Publishing
Section titled “Individual Topic Publishing”也可以将各个传感器数据发布到不同的 Topic:
void publishIndividualTopics(float temperature, float humidity, float lux) { char buffer[64];
// 温度 dtostrf(temperature, 4, 1, buffer); mqttClient.publish(MQTT_TOPIC_TEMP, buffer, true);
// 湿度 dtostrf(humidity, 4, 1, buffer); mqttClient.publish(MQTT_TOPIC_HUM, buffer, true);
// 光照 dtostrf(lux, 4, 0, buffer); mqttClient.publish(MQTT_TOPIC_LIGHT, buffer, true);}MQTT Command Reception
Section titled “MQTT Command Reception”Control Command Handling
Section titled “Control Command Handling”/// 控制命令处理// 在 config.h 中添加#define FAN_PIN 18
// 命令回调函数void onMqttMessageHandler(String topic, String payload) { // 检查是否为控制命令 if (topic.endsWith("/control")) { StaticJsonDocument<128> doc; DeserializationError error = deserializeJson(doc, payload);
if (error) { Serial.print("JSON parsing failed: "); Serial.println(error.c_str()); return; }
// 解析命令 const char* command = doc["command"]; const char* target = doc["target"];
if (strcmp(target, "fan") == 0) { if (strcmp(command, "on") == 0) { digitalWrite(FAN_PIN, HIGH); Serial.println("Fan turned ON");
// 回复状态 mqttClient.publish( (String(MQTT_TOPIC) + "/fan/status").c_str(), "{\"fan\":\"on\"}", true ); } else if (strcmp(command, "off") == 0) { digitalWrite(FAN_PIN, LOW); Serial.println("Fan turned OFF");
mqttClient.publish( (String(MQTT_TOPIC) + "/fan/status").c_str(), "{\"fan\":\"off\"}", true ); } } }}Complete Integration
Section titled “Complete Integration”Integrated main.cpp
Section titled “Integrated main.cpp”#include <Arduino.h>#include "config.h"#include "wifi_mqtt.h"#include "sensors.h"
// 传感器对象SensorManager sensorManager;
// 采集状态unsigned long lastSampleTime = 0;
void setup() { Serial.begin(115200); Serial.println("\n=== Factory Environment Monitor v2.0 ===");
// 初始化传感器 sensorManager.begin();
// 连接 WiFi connectWiFi();
// 设置 MQTT 回调 onMqttMessage = onMqttMessageHandler;
// 连接 MQTT connectMQTT();}
void loop() { // 维护连接 checkConnections();
// 定时采集和发布 unsigned long now = millis(); if (now - lastSampleTime >= SAMPLE_INTERVAL_MS) { lastSampleTime = now;
// 读取传感器 SensorData data = sensorManager.readAll();
if (data.valid) { // 通过 MQTT 发布 publishSensorData(data.temperature, data.humidity, data.lux); } }}验证 MQTT 数据发送
Section titled “验证 MQTT 数据发送”# 在服务器终端使用 mosquitto_sub 订阅 Topic 验证数据接收mosquitto_sub -h localhost -t "factory/zone1/environment" -v预期输出:
factory/zone1/environment {"device_id":"ESP32_001","location":"Factory Zone 1","timestamp":1697123456,"temperature":{"value":26.5,"unit":"celsius"},"humidity":{"value":62.3,"unit":"percent"},"lux":{"value":450,"unit":"lux"}}验证检查清单:
- MQTT Broker 能收到 ESP32 发布的 JSON 数据
- 数据每 3 秒更新一次
- 设备离线后显示 “offline” 状态消息
- 控制命令能正确执行(如风扇开关)
Issue 1: MQTT 连接失败
Section titled “Issue 1: MQTT 连接失败”症状:串口输出 “MQTT connection failed, state: -4”
可能原因:
- Broker 地址错误
- 端口被防火墙阻止
- 客户端 ID 冲突
解决方案:
- 使用
ping测试到 Broker 的网络连通性 - 确认 MQTT 端口 (1883) 未被防火墙阻止
- 修改客户端 ID 确保唯一性
Issue 2: 数据发布失败
Section titled “Issue 2: 数据发布失败”症状:mqttClient.publish() 返回 false
可能原因:
- MQTT 连接已断开
- 消息体超过最大限制
- 发布频率过高
解决方案:
- 在发布前检查
mqttClient.connected() - 减小 JSON payload 大小
- 降低发布频率或使用 QoS 0
- ✅ 推荐: 使用保留消息 (retained) 设备状态和最新数据
- ✅ 推荐: 实现遗嘱消息 (Last Will) 通知设备离线
- ✅ 推荐: 控制 Topic 和 数据 Topic 分开管理
- ❌ 避免: 在回调函数中使用阻塞操作或过长处理
- ❌ 避免: 频繁重连(应使用指数退避策略)
- ❌ 避免: 在 Topic 中包含设备 IP 等变化信息
Summary
Section titled “Summary”本节要点总结:
- MQTT 客户端:使用 PubSubClient 库实现 ESP32 的 MQTT 通信
- 连接管理:WiFi 和 MQTT 分别独立处理断线重连
- 数据发布:使用 ArduinoJson 构建标准 JSON 格式,发布到 MQTT
- 命令接收:设备订阅控制 Topic,解析 JSON 命令执行操作
- 遗嘱消息:通过 Last Will 通知其他系统组件设备的在线状态