回调函数处理
回调函数处理
Section titled “回调函数处理”本节介绍 MQTT 回调函数,它在 ESP32 上处理传入的消息。学完本节后,你将能够:
- 理解回调函数的签名和参数
- 实现基于主题的消息路由
- 处理不同类型的载荷(字符串、数字、JSON)
- 管理由 MQTT 命令触发的状态变更
- MQTT 客户端设置(01-11)
- JSON 消息处理(01-14)
- 基础草图架构(01-12)
MQTT 回调函数通过 client.setCallback() 注册,当订阅的主题上有消息到达时自动被调用。
函数签名:
void callback(char* topic, byte* payload, unsigned int length);| 参数 | 类型 | 描述 |
|---|---|---|
topic | char* | MQTT 主题(以 null 结尾的字符串) |
payload | byte* | 原始消息字节 |
length | unsigned int | 载荷的字节长度 |
重要提示:回调函数在 client.loop() 的上下文中执行。务必保持快速——不要在回调中使用 delay() 或长时间运行的操作。如果需要长时间运行的操作,请设置一个标志并在主 loop() 中执行。
回调的工作原理
Section titled “回调的工作原理”[MQTT Broker] --消息--> [PubSubClient] --回调--> [你的代码] │ client.loop() (每次迭代调用)当 client.loop() 运行时:
- 检查 MQTT 套接字上是否有传入数据
- 如果完整的消息已到达,提取主题和载荷
- 调用注册的回调函数
- 你的回调函数处理消息
步骤 1:基本回调结构
Section titled “步骤 1:基本回调结构”#include <WiFi.h>#include <PubSubClient.h>
WiFiClient wifiClient;PubSubClient client(wifiClient);
void setup() { Serial.begin(115200);
// ... Wi-Fi 连接 ...
client.setServer("192.168.1.100", 1883); client.setCallback(callback); // 注册回调函数 client.connect("esp32-callback");
// 订阅主题 client.subscribe("esp32/control/#"); // 多级通配符}
void loop() { if (!client.connected()) { // 重连 } client.loop(); // 处理传入消息 → 触发回调}
// 回调函数void callback(char* topic, byte* payload, unsigned int length) { // 步骤 1:将载荷转换为字符串 String message = ""; for (unsigned int i = 0; i < length; i++) { message += (char)payload[i]; }
// 步骤 2:记录传入消息 Serial.print("收到消息 ["); Serial.print(topic); Serial.print("]: "); Serial.println(message);
// 步骤 3:基于主题路由 if (String(topic) == "esp32/control/led") { if (message == "ON") { digitalWrite(2, HIGH); } else if (message == "OFF") { digitalWrite(2, LOW); } }}步骤 2:基于主题的多主题路由
Section titled “步骤 2:基于主题的多主题路由”对于订阅了多个主题的项目,使用结构化的路由方法:
void callback(char* topic, byte* payload, unsigned int length) { // 转换载荷 String message = ""; for (unsigned int i = 0; i < length; i++) { message += (char)payload[i]; }
String topicStr = String(topic);
// 按主题前缀路由 if (topicStr.startsWith("esp32/control/led")) { handleLEDCommand(topicStr, message); } else if (topicStr.startsWith("esp32/control/sensor")) { handleSensorCommand(message); } else if (topicStr.startsWith("esp32/control/config")) { handleConfigCommand(message); } else { Serial.print("未知主题: "); Serial.println(topicStr); }}
void handleLEDCommand(String topic, String message) { // 解析特定的 LED 子主题 if (topic.endsWith("/brightness")) { int brightness = message.toInt(); ledcWrite(0, brightness); Serial.print("LED 亮度设置为: "); Serial.println(brightness); } else if (topic.endsWith("/state")) { if (message == "ON") { digitalWrite(2, HIGH); } else { digitalWrite(2, LOW); } }}
void handleSensorCommand(String message) { int interval = message.toInt(); if (interval >= 1000 && interval <= 60000) { sensorInterval = interval; Serial.print("传感器间隔更改为: "); Serial.print(interval); Serial.println(" ms"); }}
void handleConfigCommand(String message) { // 解析 JSON 配置 // 参见 01-13 ArduinoJson 解析 StaticJsonDocument<200> doc; DeserializationError error = deserializeJson(doc, message);
if (!error) { if (doc.containsKey("threshold")) { temperatureThreshold = doc["threshold"]; } if (doc.containsKey("mode")) { const char* mode = doc["mode"]; Serial.print("模式更改为: "); Serial.println(mode); } }}步骤 3:使用全局状态标志(非阻塞模式)
Section titled “步骤 3:使用全局状态标志(非阻塞模式)”避免在回调内部执行长时间操作。改用标志:
// 由回调设置的全局标志,在 loop() 中检查volatile bool ledToggleRequested = false;volatile bool sensorResetRequested = false;volatile int newPublishInterval = -1;
void callback(char* topic, byte* payload, unsigned int length) { String message = ""; for (unsigned int i = 0; i < length; i++) { message += (char)payload[i]; }
String topicStr = String(topic);
// 设置标志(快速——不阻塞 MQTT) if (topicStr == "esp32/control/led/toggle") { ledToggleRequested = true; // 将在 loop() 中处理 } else if (topicStr == "esp32/control/reset") { sensorResetRequested = true; } else if (topicStr == "esp32/control/interval") { newPublishInterval = message.toInt(); // 将在 loop() 中应用 }}
void loop() { client.loop();
// 处理标志(可能耗时,但在这里没问题) if (ledToggleRequested) { ledToggleRequested = false; digitalWrite(2, !digitalRead(2)); Serial.println("通过标志切换 LED"); }
if (sensorResetRequested) { sensorResetRequested = false; resetSensorReadings(); Serial.println("传感器重置已执行"); }
if (newPublishInterval > 0) { PUBLISH_INTERVAL = newPublishInterval; Serial.printf("发布间隔更改为 %d ms\n", PUBLISH_INTERVAL); newPublishInterval = -1; }
// ... 正常循环代码 ...}步骤 4:处理大型载荷
Section titled “步骤 4:处理大型载荷”对于超过 256 字节的 JSON 载荷,采用不同的处理方式:
void callback(char* topic, byte* payload, unsigned int length) { // 对于大型载荷,使用 DynamicJsonDocument // 将载荷复制到以 null 结尾的缓冲区 char* jsonBuffer = new char[length + 1]; memcpy(jsonBuffer, payload, length); jsonBuffer[length] = '\0';
// 解析 DynamicJsonDocument doc(1024); DeserializationError error = deserializeJson(doc, jsonBuffer);
if (!error) { const char* command = doc["command"]; JsonArray values = doc["values"];
Serial.print("命令: "); Serial.println(command); Serial.print("值数量: "); Serial.println(values.size()); }
delete[] jsonBuffer; // 释放内存}步骤 5:使用类方法的回调(面向对象)
Section titled “步骤 5:使用类方法的回调(面向对象)”适用于使用类的进阶项目:
class MQTTHandler { private: PubSubClient* client;
public: MQTTHandler(PubSubClient* mqttClient) : client(mqttClient) { client->setCallback(std::bind(&MQTTHandler::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); }
void onMessage(char* topic, byte* payload, unsigned int length) { // 现在这是类方法——可以访问成员变量 String message = ""; for (int i = 0; i < length; i++) message += (char)payload[i];
Serial.printf("[%s] %s\n", topic, message.c_str());
// 基于主题处理 processCommand(String(topic), message); }
void processCommand(String topic, String message) { // 类级别处理 }};- 订阅的主题上有消息到达时,回调被调用
- 基于主题的路由正确地将消息指向对应的处理函数
- 回调中设置的全局标志在主循环中被处理
- 大型 JSON 载荷成功被解析
- 回调不阻塞
client.loop()的处理
回调未被调用
Section titled “回调未被调用”原因:
- 在
client.connect()之前未调用client.setCallback(callback) - 主循环中未调用
client.loop() - 主题订阅失败
解决方案:
- 验证在
setup()中的connect()之前已调用setCallback() - 在回调内部添加 Serial 打印以确认已到达
- 在
loop()的开头调用client.loop(),而不是在延时之后 - 使用 MQTT Explorer 验证主题和载荷是否正确
回调收到乱码数据
Section titled “回调收到乱码数据”原因:载荷包含 null 字节或二进制数据。
解决方案:将载荷转换为字符串时,检查 null 字节:
String message = "";for (unsigned int i = 0; i < length; i++) { if (payload[i] != '\0') { // 跳过 null 字节 message += (char)payload[i]; }}单个消息触发了多次回调
Section titled “单个消息触发了多次回调”原因:同一次循环迭代中多次调用 client.loop()。
解决方案:确保每次 loop() 迭代只调用一次 client.loop()。移除任何额外的调用。
- 保持回调快速执行:绝不在回调中使用
delay()、长循环或阻塞操作 - 对慢操作使用标志变量:在回调中设置标志,在
loop()中执行操作 - 在路由前先转换载荷:将
byte*转换为String一次,而不是在每个处理函数中都转换 - 使用 Serial 记录传入消息:有助于在开发过程中诊断问题
- 使用主题前缀组织:
项目/设备/动作结构使路由更清晰 - 考虑对标志使用
volatile:当在回调中设置标志而在loop()中读取时
- 回调函数接收主题、载荷和长度——先将载荷转换为字符串
- 基于主题的路由将消息指向特定的处理函数
- 对长时间操作使用全局标志,以避免阻塞
client.loop() - 使用 DynamicJsonDocument 在堆上处理大型载荷
- 回调必须快速执行——它在 MQTT 处理路径中运行
- 对于面向对象设计,可以通过
std::bind实现基于类的回调