跳转到内容

QoS 1(至少一次)

本节介绍 MQTT QoS 1(至少一次送达)的工作原理、确认机制和适用场景。学习完成后,您将能够:

  • 理解 QoS 1 的 PUBACK 确认机制
  • 掌握 QoS 1 的消息重试和去重处理
  • 确定最适合使用 QoS 1 的业务场景
  • 在 Node-RED 和 ESP32 中配置 QoS 1

在开始本节之前,请确保:

  • 理解 QoS 0 的基本概念
  • Mosquitto Broker 已运行
  • 已安装 MQTT 命令行工具

QoS 1(At least once,至少一次)确保消息至少被送达一次,通过确认和重试机制实现。

核心特点

  • 发送者收到 PUBACK 确认后才认为消息投递完成
  • 如果未收到确认,发送者会重试
  • 消息可能重复送达
  • 需要存储消息状态直到收到确认
Publisher Broker Subscriber
│ │ │
│─── PUBLISH ──────────→│ │
│ (QoS=1, PacketID=1) │ │
│ │ │
│←─── PUBACK ───────────│ │
│ (PacketID=1) │ │
│ │─── PUBLISH ──────────→│
│ ✓ Delivery confirmed │ (QoS=1, PacketID=1)│
│ ✓ Can discard msg │ │
│ │←─── PUBACK ──────────│
│ │ (PacketID=1) │
│ │ │
│ Complete flow │ ✓ Delivery confirmed│

当发布者未收到 PUBACK 时的行为:

时间线:
T0: Publisher sends PUBLISH (QoS=1, PacketID=5)
[网络延迟或丢失]
T1: Publisher 等待 PUBACK (超时 = 重试间隔)
T2: Publisher 重新发送 PUBLISH (QoS=1, PacketID=5)
[可能原始消息已到达但 ACK 丢失]
T3: Broker 收到重复消息
→ 如果 PacketID 已处理过,丢弃重复
→ 否则正常处理并发送 PUBACK
T4: Publisher 收到 PUBACK
→ 确认投递完成,丢弃存储的消息
结果: 消息可能被处理多次(重复)
PUBLISH 报文 (从 Publisher 到 Broker):
┌─────────────────────────────────────────┐
│ Fixed Header: 0x32 (PUBLISH + QoS=1) │
│ Variable Header: │
│ Topic Length: 2 bytes │
│ Topic: "test/topic" │
│ Packet Identifier: 0001 │
│ Payload: "Hello MQTT QoS 1" │
└─────────────────────────────────────────┘
PUBACK 报文 (从 Broker 到 Publisher):
┌─────────────────────────────────────────┐
│ Fixed Header: 0x40 (PUBACK) │
│ Variable Header: │
│ Packet Identifier: 0001 │
└─────────────────────────────────────────┘

Broker 需要处理接收到的重复消息:

// Broker 内部去重逻辑(概念示例)
const packetStore = new Map();
function handlePublish(packet) {
const key = `${packet.clientId}:${packet.packetId}`;
if (packetStore.has(key)) {
// 重复消息,只重新发送 PUBACK,不处理数据
sendPuback(packet.clientId, packet.packetId);
return;
}
// 新消息,存储 PacketID
packetStore.set(key, true);
// 处理消息
deliverToSubscribers(packet);
// 发送确认
sendPuback(packet.clientId, packet.packetId);
}
场景推荐 QoS理由
设备控制命令(开关灯)QoS 1控制命令不能丢失,重复执行可接受
告警通知QoS 1告警必须到达,多一次告警可接受
配置更新QoS 1配置必须应用,重复应用不影响结果
设备在线状态QoS 1状态更新必须可靠
门禁事件QoS 1事件需记录,重复记录可接受
场景推荐 QoS理由
高频传感器数据QoS 0QoS 1 额外开销不必要
支付交易QoS 2重复交易不可接受
库存计数QoS 2重复计算会导致错误
计费数据QoS 2必须精确一次
#include <PubSubClient.h>
// 使用 QoS 1 发布控制命令
void publishControlCommand(String device, String command) {
char payload[64];
snprintf(payload, 64,
"{\"device\":\"%s\",\"command\":\"%s\"}",
device.c_str(), command.c_str());
// publish(topic, payload, retained) → QoS 0
// publish_P(topic, payload, length, retained) → QoS 0
// publish(topic, payload, length, retained) → QoS 0
//
// 正确方式:
// publish(topic, (uint8_t*)payload, length, retained)
// 最后一个布尔参数是 retain,不是 QoS!
//
// PubSubClient 的 publish 方法有多个重载:
// publish(topic, payload) → QoS 0
// publish(topic, payload, retained) → QoS 0
// publish_P(topic, payload, len, retained) → QoS 0
// 要使用 QoS 1,需要使用带 QoS 参数的方法
// PubSubClient 不支持直接选择 QoS 级别!
// 需要使用 mqttClient.publish(topic, payload) 总是 QoS 0
//
// 或者使用 publish 重载:
// boolean publish(const char* topic, const uint8_t* payload,
// unsigned int plength, bool retained)
// 这个版本总是 QoS 0
//
// **PubSubClient 默认使用 QoS 0**
// 如果需要 QoS 1,需要使用 AsyncMqttClient 或其他支持 QoS 的库
}

注意:PubSubClient 库默认只支持 QoS 0。如果需要 QoS 1,推荐使用 AsyncMqttClientMQTT 库。

使用 AsyncMqttClient 实现 QoS 1

#include <AsyncMqttClient.h>
AsyncMqttClient mqttClient;
void connectToMQTT() {
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.onPublish(onMqttPublish);
mqttClient.connect(MQTT_BROKER, 1883);
}
void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT");
// 使用 QoS 1 发布
mqttClient.publish(
"devices/control", // topic
1, // QoS
false, // retain
"{\"cmd\":\"toggle\"}" // payload
);
}
// 发布完成回调
void onMqttPublish(uint16_t packetId) {
Serial.print("Publish confirmed, packet ID: ");
Serial.println(packetId);
}
MQTT Out 节点配置:
├── Server: [MQTT Broker]
├── Topic: devices/control
├── QoS: 1 (At least once) ← 选择
├── Retain: false
└── Name: Device Control
MQTT In 节点配置:
├── Server: [MQTT Broker]
├── Topic: devices/status
├── QoS: 1 (At least once)
├── Output: a parsed JSON object
└── Name: Device Status
Terminal window
# 发布 QoS 1 消息
mosquitto_pub -h localhost -t "control/device1" \
-q 1 -m '{"power": "on"}'
# 订阅 QoS 1
mosquitto_sub -h localhost -t "control/device1" \
-q 1 -v
# 查看 QoS 级别
mosquitto_sub -h localhost -t "#" -v -q 1
指标QoS 0QoS 1相比 QoS 0 的变化
消息大小最小+2 字节Packet ID 字段
网络往返01 次确认增加一次 TCP 往返
发布者状态存储直到收到 PUBACK内存增加
消息延迟< 1ms1-2ms增加 1-2ms
吞吐量~50,000/s~20,000/s降低 60%
带宽消耗基准+300%含重试流量
Terminal window
# 测试确认机制
# 终端 1: 订阅(使用 -q 1)
mosquitto_sub -h localhost -t "qos1/test" -q 1 -v
# 终端 2: 发布(使用 -q 1)
mosquitto_pub -h localhost -t "qos1/test" -q 1 -m "QoS 1 test"
# 查看 Broker 日志验证 PUBACK
docker logs mosquitto --tail 20
# 应看到类似:
# New client connected as ...
# Received PUBLISH from ... (d0, q1, r0, m1, ...)
# Sending PUBACK to ... (m1)
  • 推荐: 控制命令和告警使用 QoS 1
  • 推荐: 在应用层实现去重逻辑(如果重复会造成问题)
  • 推荐: 设置合理的重试间隔(建议 5-10 秒)
  • 避免: 对不可重复执行的操作使用 QoS 1(如”加一”操作)
  • 避免: 高频发布的场景使用 QoS 1(传感器数据用 QoS 0)
  • 避免: 无限重试(设置最大重试次数)

本节要点总结:

  1. QoS 1 机制:发送 → 等待 PUBACK → 未确认则重试
  2. 可靠性:至少一次送达,可能重复
  3. 性能影响:相比 QoS 0,吞吐量降低约 60%,延迟增加 1-2ms
  4. 适用场景:控制命令、告警通知、设备状态更新
  5. 去重要求:应用层需要考虑消息重复的可能性