跳转到内容

MQTT 数据传输

MQTT 数据传输

本节介绍如何将 ESP32 采集的传感器数据通过 MQTT 协议发送到服务器端。学习完成后,您将能够:

  • 在 ESP32 上配置 MQTT 客户端连接
  • 实现 JSON 数据的 MQTT 发布
  • 实现 MQTT 订阅和命令接收
  • 处理 MQTT 断线重连

在开始本节之前,请确保:

  • 已完成 MQTT 协议基础学习
  • 已完成传感器数据读取逻辑
  • Mosquitto 或 EMQX Broker 已运行
  • MQTT Explorer 或 mosquitto_sub 工具已就绪

ESP32 上使用 PubSubClient 库实现 MQTT 通信:

Arduino IDE 安装

工具 → 管理库 → 搜索 "PubSubClient" by Nick O'Leary → 安装

PlatformIO 配置

; platformio.ini
lib_deps =
knolleary/PubSubClient@^2.8
bblanchon/ArduinoJson@^6.21
// wifi_mqtt.h - WiFi 和 MQTT 连接管理头文件
#ifndef WIFI_MQTT_H
#define WIFI_MQTT_H
#include <WiFi.h>
#include <PubSubClient.h>
extern WiFiClient espClient;
extern PubSubClient mqttClient;
// 连接状态回调
extern void (*onMqttConnected)();
extern void (*onMqttMessage)(String topic, String payload);
bool connectWiFi();
bool connectMQTT();
void checkConnections();
void mqttCallback(char* topic, byte* payload, unsigned int length);
#endif
// wifi_mqtt.cpp - WiFi 和 MQTT 连接管理实现
#include "wifi_mqtt.h"
#include "config.h"
WiFiClient espClient;
PubSubClient mqttClient(espClient);
// 回调函数指针
void (*onMqttConnected)() = NULL;
void (*onMqttMessage)(String topic, String payload) = NULL;
// WiFi 连接状态
unsigned long lastWifiRetry = 0;
const unsigned long WIFI_RETRY_INTERVAL = 10000; // 10 秒重试
// MQTT 连接状态
unsigned long lastMqttRetry = 0;
const unsigned long MQTT_RETRY_INTERVAL = 5000; // 5 秒重试
bool connectWiFi() {
if (WiFi.status() == WL_CONNECTED) {
return true;
}
Serial.print("Connecting to WiFi: ");
Serial.println(WIFI_SSID);
WiFi.mode(WIFI_STA);
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
int attempts = 0;
while (WiFi.status() != WL_CONNECTED && attempts < 20) {
delay(500);
Serial.print(".");
attempts++;
}
if (WiFi.status() == WL_CONNECTED) {
Serial.println("\nWiFi connected!");
Serial.print("IP Address: ");
Serial.println(WiFi.localIP());
return true;
} else {
Serial.println("\nWiFi connection failed!");
return false;
}
}
bool connectMQTT() {
if (mqttClient.connected()) {
return true;
}
Serial.print("Connecting to MQTT Broker: ");
Serial.println(MQTT_BROKER);
mqttClient.setServer(MQTT_BROKER, MQTT_PORT);
mqttClient.setCallback(mqttCallback);
// 尝试连接(使用 Last Will 遗嘱消息)
String willTopic = String(MQTT_TOPIC) + "/status";
String willMessage = "{\"device\":\"" + String(DEVICE_ID) + "\",\"status\":\"offline\"}";
if (mqttClient.connect(
MQTT_CLIENT_ID,
MQTT_USER,
MQTT_PASSWORD,
willTopic.c_str(),
0, // QoS
true, // retain
willMessage.c_str()
)) {
Serial.println("MQTT connected!");
// 发布上线消息
String onlineMsg = "{\"device\":\"" + String(DEVICE_ID) + "\",\"status\":\"online\"}";
mqttClient.publish(willTopic.c_str(), onlineMsg.c_str(), true);
// 订阅控制 Topic
String controlTopic = String(MQTT_TOPIC) + "/control";
mqttClient.subscribe(controlTopic.c_str());
Serial.print("Subscribed to: ");
Serial.println(controlTopic);
// 触发连接回调
if (onMqttConnected != NULL) {
onMqttConnected();
}
return true;
} else {
Serial.print("MQTT connection failed, state: ");
Serial.println(mqttClient.state());
return false;
}
}
void mqttCallback(char* topic, byte* payload, unsigned int length) {
// 将 payload 转换为 String
String message;
for (unsigned int i = 0; i < length; i++) {
message += (char)payload[i];
}
String topicStr = String(topic);
Serial.print("MQTT received [");
Serial.print(topicStr);
Serial.print("]: ");
Serial.println(message);
// 触发消息回调
if (onMqttMessage != NULL) {
onMqttMessage(topicStr, message);
}
}
void checkConnections() {
unsigned long now = millis();
// WiFi 断线重连
if (WiFi.status() != WL_CONNECTED) {
if (now - lastWifiRetry > WIFI_RETRY_INTERVAL) {
lastWifiRetry = now;
connectWiFi();
}
}
// MQTT 断线重连
if (WiFi.status() == WL_CONNECTED && !mqttClient.connected()) {
if (now - lastMqttRetry > MQTT_RETRY_INTERVAL) {
lastMqttRetry = now;
connectMQTT();
}
}
// 维护 MQTT 连接
if (mqttClient.connected()) {
mqttClient.loop();
}
}
// 使用 ArduinoJson 构建和发布数据
#include <ArduinoJson.h>
#include "config.h"
#include "wifi_mqtt.h"
// 发布传感器数据到 MQTT
void publishSensorData(float temperature, float humidity, float lux) {
if (!mqttClient.connected()) {
Serial.println("MQTT not connected, skipping publish");
return;
}
// 创建 JSON 文档
StaticJsonDocument<256> doc;
doc["device_id"] = DEVICE_ID;
doc["location"] = DEVICE_LOCATION;
doc["timestamp"] = millis() / 1000; // 相对时间戳
// 温度
JsonObject temp = doc.createNestedObject("temperature");
temp["value"] = temperature;
temp["unit"] = "celsius";
// 湿度
JsonObject hum = doc.createNestedObject("humidity");
hum["value"] = humidity;
hum["unit"] = "percent";
// 光照
JsonObject light = doc.createNestedObject("lux");
light["value"] = lux;
light["unit"] = "lux";
// 信号质量
JsonObject quality = doc.createNestedObject("quality");
quality["signal_rssi"] = WiFi.RSSI();
quality["wifi_strength"] = map(WiFi.RSSI(), -100, -30, 0, 100);
// 序列化 JSON
char buffer[256];
size_t n = serializeJson(doc, buffer);
// 发布到 MQTT
if (mqttClient.publish(MQTT_TOPIC, buffer, true)) {
Serial.print("Published to ");
Serial.print(MQTT_TOPIC);
Serial.print(": ");
Serial.println(buffer);
} else {
Serial.println("Publish failed!");
}
}

也可以将各个传感器数据发布到不同的 Topic:

void publishIndividualTopics(float temperature, float humidity, float lux) {
char buffer[64];
// 温度
dtostrf(temperature, 4, 1, buffer);
mqttClient.publish(MQTT_TOPIC_TEMP, buffer, true);
// 湿度
dtostrf(humidity, 4, 1, buffer);
mqttClient.publish(MQTT_TOPIC_HUM, buffer, true);
// 光照
dtostrf(lux, 4, 0, buffer);
mqttClient.publish(MQTT_TOPIC_LIGHT, buffer, true);
}
/// 控制命令处理
// 在 config.h 中添加
#define FAN_PIN 18
// 命令回调函数
void onMqttMessageHandler(String topic, String payload) {
// 检查是否为控制命令
if (topic.endsWith("/control")) {
StaticJsonDocument<128> doc;
DeserializationError error = deserializeJson(doc, payload);
if (error) {
Serial.print("JSON parsing failed: ");
Serial.println(error.c_str());
return;
}
// 解析命令
const char* command = doc["command"];
const char* target = doc["target"];
if (strcmp(target, "fan") == 0) {
if (strcmp(command, "on") == 0) {
digitalWrite(FAN_PIN, HIGH);
Serial.println("Fan turned ON");
// 回复状态
mqttClient.publish(
(String(MQTT_TOPIC) + "/fan/status").c_str(),
"{\"fan\":\"on\"}",
true
);
} else if (strcmp(command, "off") == 0) {
digitalWrite(FAN_PIN, LOW);
Serial.println("Fan turned OFF");
mqttClient.publish(
(String(MQTT_TOPIC) + "/fan/status").c_str(),
"{\"fan\":\"off\"}",
true
);
}
}
}
}
#include <Arduino.h>
#include "config.h"
#include "wifi_mqtt.h"
#include "sensors.h"
// 传感器对象
SensorManager sensorManager;
// 采集状态
unsigned long lastSampleTime = 0;
void setup() {
Serial.begin(115200);
Serial.println("\n=== Factory Environment Monitor v2.0 ===");
// 初始化传感器
sensorManager.begin();
// 连接 WiFi
connectWiFi();
// 设置 MQTT 回调
onMqttMessage = onMqttMessageHandler;
// 连接 MQTT
connectMQTT();
}
void loop() {
// 维护连接
checkConnections();
// 定时采集和发布
unsigned long now = millis();
if (now - lastSampleTime >= SAMPLE_INTERVAL_MS) {
lastSampleTime = now;
// 读取传感器
SensorData data = sensorManager.readAll();
if (data.valid) {
// 通过 MQTT 发布
publishSensorData(data.temperature, data.humidity, data.lux);
}
}
}
Terminal window
# 在服务器终端使用 mosquitto_sub 订阅 Topic 验证数据接收
mosquitto_sub -h localhost -t "factory/zone1/environment" -v

预期输出

factory/zone1/environment {"device_id":"ESP32_001","location":"Factory Zone 1","timestamp":1697123456,"temperature":{"value":26.5,"unit":"celsius"},"humidity":{"value":62.3,"unit":"percent"},"lux":{"value":450,"unit":"lux"}}

验证检查清单

  • MQTT Broker 能收到 ESP32 发布的 JSON 数据
  • 数据每 3 秒更新一次
  • 设备离线后显示 “offline” 状态消息
  • 控制命令能正确执行(如风扇开关)

症状:串口输出 “MQTT connection failed, state: -4”

可能原因

  • Broker 地址错误
  • 端口被防火墙阻止
  • 客户端 ID 冲突

解决方案

  1. 使用 ping 测试到 Broker 的网络连通性
  2. 确认 MQTT 端口 (1883) 未被防火墙阻止
  3. 修改客户端 ID 确保唯一性

症状mqttClient.publish() 返回 false

可能原因

  • MQTT 连接已断开
  • 消息体超过最大限制
  • 发布频率过高

解决方案

  1. 在发布前检查 mqttClient.connected()
  2. 减小 JSON payload 大小
  3. 降低发布频率或使用 QoS 0
  • 推荐: 使用保留消息 (retained) 设备状态和最新数据
  • 推荐: 实现遗嘱消息 (Last Will) 通知设备离线
  • 推荐: 控制 Topic 和 数据 Topic 分开管理
  • 避免: 在回调函数中使用阻塞操作或过长处理
  • 避免: 频繁重连(应使用指数退避策略)
  • 避免: 在 Topic 中包含设备 IP 等变化信息

本节要点总结:

  1. MQTT 客户端:使用 PubSubClient 库实现 ESP32 的 MQTT 通信
  2. 连接管理:WiFi 和 MQTT 分别独立处理断线重连
  3. 数据发布:使用 ArduinoJson 构建标准 JSON 格式,发布到 MQTT
  4. 命令接收:设备订阅控制 Topic,解析 JSON 命令执行操作
  5. 遗嘱消息:通过 Last Will 通知其他系统组件设备的在线状态