跳转到内容

回调函数处理

本节介绍 MQTT 回调函数,它在 ESP32 上处理传入的消息。学完本节后,你将能够:

  • 理解回调函数的签名和参数
  • 实现基于主题的消息路由
  • 处理不同类型的载荷(字符串、数字、JSON)
  • 管理由 MQTT 命令触发的状态变更
  • MQTT 客户端设置(01-11)
  • JSON 消息处理(01-14)
  • 基础草图架构(01-12)

MQTT 回调函数通过 client.setCallback() 注册,当订阅的主题上有消息到达时自动被调用。

函数签名

void callback(char* topic, byte* payload, unsigned int length);
参数类型描述
topicchar*MQTT 主题(以 null 结尾的字符串)
payloadbyte*原始消息字节
lengthunsigned int载荷的字节长度

重要提示:回调函数在 client.loop() 的上下文中执行。务必保持快速——不要在回调中使用 delay() 或长时间运行的操作。如果需要长时间运行的操作,请设置一个标志并在主 loop() 中执行。

[MQTT Broker] --消息--> [PubSubClient] --回调--> [你的代码]
client.loop()
(每次迭代调用)

client.loop() 运行时:

  1. 检查 MQTT 套接字上是否有传入数据
  2. 如果完整的消息已到达,提取主题和载荷
  3. 调用注册的回调函数
  4. 你的回调函数处理消息
#include <WiFi.h>
#include <PubSubClient.h>
WiFiClient wifiClient;
PubSubClient client(wifiClient);
void setup() {
Serial.begin(115200);
// ... Wi-Fi 连接 ...
client.setServer("192.168.1.100", 1883);
client.setCallback(callback); // 注册回调函数
client.connect("esp32-callback");
// 订阅主题
client.subscribe("esp32/control/#"); // 多级通配符
}
void loop() {
if (!client.connected()) {
// 重连
}
client.loop(); // 处理传入消息 → 触发回调
}
// 回调函数
void callback(char* topic, byte* payload, unsigned int length) {
// 步骤 1:将载荷转换为字符串
String message = "";
for (unsigned int i = 0; i < length; i++) {
message += (char)payload[i];
}
// 步骤 2:记录传入消息
Serial.print("收到消息 [");
Serial.print(topic);
Serial.print("]: ");
Serial.println(message);
// 步骤 3:基于主题路由
if (String(topic) == "esp32/control/led") {
if (message == "ON") {
digitalWrite(2, HIGH);
} else if (message == "OFF") {
digitalWrite(2, LOW);
}
}
}

对于订阅了多个主题的项目,使用结构化的路由方法:

void callback(char* topic, byte* payload, unsigned int length) {
// 转换载荷
String message = "";
for (unsigned int i = 0; i < length; i++) {
message += (char)payload[i];
}
String topicStr = String(topic);
// 按主题前缀路由
if (topicStr.startsWith("esp32/control/led")) {
handleLEDCommand(topicStr, message);
} else if (topicStr.startsWith("esp32/control/sensor")) {
handleSensorCommand(message);
} else if (topicStr.startsWith("esp32/control/config")) {
handleConfigCommand(message);
} else {
Serial.print("未知主题: ");
Serial.println(topicStr);
}
}
void handleLEDCommand(String topic, String message) {
// 解析特定的 LED 子主题
if (topic.endsWith("/brightness")) {
int brightness = message.toInt();
ledcWrite(0, brightness);
Serial.print("LED 亮度设置为: ");
Serial.println(brightness);
} else if (topic.endsWith("/state")) {
if (message == "ON") {
digitalWrite(2, HIGH);
} else {
digitalWrite(2, LOW);
}
}
}
void handleSensorCommand(String message) {
int interval = message.toInt();
if (interval >= 1000 && interval <= 60000) {
sensorInterval = interval;
Serial.print("传感器间隔更改为: ");
Serial.print(interval);
Serial.println(" ms");
}
}
void handleConfigCommand(String message) {
// 解析 JSON 配置
// 参见 01-13 ArduinoJson 解析
StaticJsonDocument<200> doc;
DeserializationError error = deserializeJson(doc, message);
if (!error) {
if (doc.containsKey("threshold")) {
temperatureThreshold = doc["threshold"];
}
if (doc.containsKey("mode")) {
const char* mode = doc["mode"];
Serial.print("模式更改为: ");
Serial.println(mode);
}
}
}

步骤 3:使用全局状态标志(非阻塞模式)

Section titled “步骤 3:使用全局状态标志(非阻塞模式)”

避免在回调内部执行长时间操作。改用标志:

// 由回调设置的全局标志,在 loop() 中检查
volatile bool ledToggleRequested = false;
volatile bool sensorResetRequested = false;
volatile int newPublishInterval = -1;
void callback(char* topic, byte* payload, unsigned int length) {
String message = "";
for (unsigned int i = 0; i < length; i++) {
message += (char)payload[i];
}
String topicStr = String(topic);
// 设置标志(快速——不阻塞 MQTT)
if (topicStr == "esp32/control/led/toggle") {
ledToggleRequested = true; // 将在 loop() 中处理
} else if (topicStr == "esp32/control/reset") {
sensorResetRequested = true;
} else if (topicStr == "esp32/control/interval") {
newPublishInterval = message.toInt(); // 将在 loop() 中应用
}
}
void loop() {
client.loop();
// 处理标志(可能耗时,但在这里没问题)
if (ledToggleRequested) {
ledToggleRequested = false;
digitalWrite(2, !digitalRead(2));
Serial.println("通过标志切换 LED");
}
if (sensorResetRequested) {
sensorResetRequested = false;
resetSensorReadings();
Serial.println("传感器重置已执行");
}
if (newPublishInterval > 0) {
PUBLISH_INTERVAL = newPublishInterval;
Serial.printf("发布间隔更改为 %d ms\n", PUBLISH_INTERVAL);
newPublishInterval = -1;
}
// ... 正常循环代码 ...
}

对于超过 256 字节的 JSON 载荷,采用不同的处理方式:

void callback(char* topic, byte* payload, unsigned int length) {
// 对于大型载荷,使用 DynamicJsonDocument
// 将载荷复制到以 null 结尾的缓冲区
char* jsonBuffer = new char[length + 1];
memcpy(jsonBuffer, payload, length);
jsonBuffer[length] = '\0';
// 解析
DynamicJsonDocument doc(1024);
DeserializationError error = deserializeJson(doc, jsonBuffer);
if (!error) {
const char* command = doc["command"];
JsonArray values = doc["values"];
Serial.print("命令: ");
Serial.println(command);
Serial.print("值数量: ");
Serial.println(values.size());
}
delete[] jsonBuffer; // 释放内存
}

步骤 5:使用类方法的回调(面向对象)

Section titled “步骤 5:使用类方法的回调(面向对象)”

适用于使用类的进阶项目:

class MQTTHandler {
private:
PubSubClient* client;
public:
MQTTHandler(PubSubClient* mqttClient) : client(mqttClient) {
client->setCallback(std::bind(&MQTTHandler::onMessage, this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
}
void onMessage(char* topic, byte* payload, unsigned int length) {
// 现在这是类方法——可以访问成员变量
String message = "";
for (int i = 0; i < length; i++) message += (char)payload[i];
Serial.printf("[%s] %s\n", topic, message.c_str());
// 基于主题处理
processCommand(String(topic), message);
}
void processCommand(String topic, String message) {
// 类级别处理
}
};
  • 订阅的主题上有消息到达时,回调被调用
  • 基于主题的路由正确地将消息指向对应的处理函数
  • 回调中设置的全局标志在主循环中被处理
  • 大型 JSON 载荷成功被解析
  • 回调不阻塞 client.loop() 的处理

原因

  • client.connect() 之前未调用 client.setCallback(callback)
  • 主循环中未调用 client.loop()
  • 主题订阅失败

解决方案

  1. 验证在 setup() 中的 connect() 之前已调用 setCallback()
  2. 在回调内部添加 Serial 打印以确认已到达
  3. loop() 的开头调用 client.loop(),而不是在延时之后
  4. 使用 MQTT Explorer 验证主题和载荷是否正确

原因:载荷包含 null 字节或二进制数据。

解决方案:将载荷转换为字符串时,检查 null 字节:

String message = "";
for (unsigned int i = 0; i < length; i++) {
if (payload[i] != '\0') { // 跳过 null 字节
message += (char)payload[i];
}
}

原因:同一次循环迭代中多次调用 client.loop()

解决方案:确保每次 loop() 迭代只调用一次 client.loop()。移除任何额外的调用。

  • 保持回调快速执行:绝不在回调中使用 delay()、长循环或阻塞操作
  • 对慢操作使用标志变量:在回调中设置标志,在 loop() 中执行操作
  • 在路由前先转换载荷:将 byte* 转换为 String 一次,而不是在每个处理函数中都转换
  • 使用 Serial 记录传入消息:有助于在开发过程中诊断问题
  • 使用主题前缀组织项目/设备/动作 结构使路由更清晰
  • 考虑对标志使用 volatile:当在回调中设置标志而在 loop() 中读取时
  1. 回调函数接收主题、载荷和长度——先将载荷转换为字符串
  2. 基于主题的路由将消息指向特定的处理函数
  3. 对长时间操作使用全局标志,以避免阻塞 client.loop()
  4. 使用 DynamicJsonDocument 在堆上处理大型载荷
  5. 回调必须快速执行——它在 MQTT 处理路径中运行
  6. 对于面向对象设计,可以通过 std::bind 实现基于类的回调