QoS 1(至少一次)
QoS 1(至少一次)
Section titled “QoS 1(至少一次)”本节介绍 MQTT QoS 1(至少一次送达)的工作原理、确认机制和适用场景。学习完成后,您将能够:
- 理解 QoS 1 的 PUBACK 确认机制
- 掌握 QoS 1 的消息重试和去重处理
- 确定最适合使用 QoS 1 的业务场景
- 在 Node-RED 和 ESP32 中配置 QoS 1
在开始本节之前,请确保:
- 理解 QoS 0 的基本概念
- Mosquitto Broker 已运行
- 已安装 MQTT 命令行工具
QoS 1 Overview
Section titled “QoS 1 Overview”What is QoS 1?
Section titled “What is QoS 1?”QoS 1(At least once,至少一次)确保消息至少被送达一次,通过确认和重试机制实现。
核心特点:
- 发送者收到 PUBACK 确认后才认为消息投递完成
- 如果未收到确认,发送者会重试
- 消息可能重复送达
- 需要存储消息状态直到收到确认
Message Flow
Section titled “Message Flow”Publisher Broker Subscriber │ │ │ │─── PUBLISH ──────────→│ │ │ (QoS=1, PacketID=1) │ │ │ │ │ │←─── PUBACK ───────────│ │ │ (PacketID=1) │ │ │ │─── PUBLISH ──────────→│ │ ✓ Delivery confirmed │ (QoS=1, PacketID=1)│ │ ✓ Can discard msg │ │ │ │←─── PUBACK ──────────│ │ │ (PacketID=1) │ │ │ │ │ Complete flow │ ✓ Delivery confirmed│Retry Mechanism
Section titled “Retry Mechanism”当发布者未收到 PUBACK 时的行为:
时间线:T0: Publisher sends PUBLISH (QoS=1, PacketID=5) [网络延迟或丢失]
T1: Publisher 等待 PUBACK (超时 = 重试间隔)
T2: Publisher 重新发送 PUBLISH (QoS=1, PacketID=5) [可能原始消息已到达但 ACK 丢失]
T3: Broker 收到重复消息 → 如果 PacketID 已处理过,丢弃重复 → 否则正常处理并发送 PUBACK
T4: Publisher 收到 PUBACK → 确认投递完成,丢弃存储的消息
结果: 消息可能被处理多次(重复)Packet Flow Detail
Section titled “Packet Flow Detail”PUBLISH/PUBACK Exchange
Section titled “PUBLISH/PUBACK Exchange”PUBLISH 报文 (从 Publisher 到 Broker):┌─────────────────────────────────────────┐│ Fixed Header: 0x32 (PUBLISH + QoS=1) ││ Variable Header: ││ Topic Length: 2 bytes ││ Topic: "test/topic" ││ Packet Identifier: 0001 ││ Payload: "Hello MQTT QoS 1" │└─────────────────────────────────────────┘
PUBACK 报文 (从 Broker 到 Publisher):┌─────────────────────────────────────────┐│ Fixed Header: 0x40 (PUBACK) ││ Variable Header: ││ Packet Identifier: 0001 │└─────────────────────────────────────────┘Duplicate Message Handling
Section titled “Duplicate Message Handling”Broker 需要处理接收到的重复消息:
// Broker 内部去重逻辑(概念示例)const packetStore = new Map();
function handlePublish(packet) { const key = `${packet.clientId}:${packet.packetId}`;
if (packetStore.has(key)) { // 重复消息,只重新发送 PUBACK,不处理数据 sendPuback(packet.clientId, packet.packetId); return; }
// 新消息,存储 PacketID packetStore.set(key, true);
// 处理消息 deliverToSubscribers(packet);
// 发送确认 sendPuback(packet.clientId, packet.packetId);}When to Use QoS 1
Section titled “When to Use QoS 1”Recommended Scenarios
Section titled “Recommended Scenarios”| 场景 | 推荐 QoS | 理由 |
|---|---|---|
| 设备控制命令(开关灯) | QoS 1 | 控制命令不能丢失,重复执行可接受 |
| 告警通知 | QoS 1 | 告警必须到达,多一次告警可接受 |
| 配置更新 | QoS 1 | 配置必须应用,重复应用不影响结果 |
| 设备在线状态 | QoS 1 | 状态更新必须可靠 |
| 门禁事件 | QoS 1 | 事件需记录,重复记录可接受 |
Not Recommended Scenarios
Section titled “Not Recommended Scenarios”| 场景 | 推荐 QoS | 理由 |
|---|---|---|
| 高频传感器数据 | QoS 0 | QoS 1 额外开销不必要 |
| 支付交易 | QoS 2 | 重复交易不可接受 |
| 库存计数 | QoS 2 | 重复计算会导致错误 |
| 计费数据 | QoS 2 | 必须精确一次 |
Implementation
Section titled “Implementation”ESP32 with PubSubClient
Section titled “ESP32 with PubSubClient”#include <PubSubClient.h>
// 使用 QoS 1 发布控制命令void publishControlCommand(String device, String command) { char payload[64]; snprintf(payload, 64, "{\"device\":\"%s\",\"command\":\"%s\"}", device.c_str(), command.c_str());
// publish(topic, payload, retained) → QoS 0 // publish_P(topic, payload, length, retained) → QoS 0 // publish(topic, payload, length, retained) → QoS 0 // // 正确方式: // publish(topic, (uint8_t*)payload, length, retained) // 最后一个布尔参数是 retain,不是 QoS! // // PubSubClient 的 publish 方法有多个重载: // publish(topic, payload) → QoS 0 // publish(topic, payload, retained) → QoS 0 // publish_P(topic, payload, len, retained) → QoS 0
// 要使用 QoS 1,需要使用带 QoS 参数的方法 // PubSubClient 不支持直接选择 QoS 级别! // 需要使用 mqttClient.publish(topic, payload) 总是 QoS 0 // // 或者使用 publish 重载: // boolean publish(const char* topic, const uint8_t* payload, // unsigned int plength, bool retained) // 这个版本总是 QoS 0 // // **PubSubClient 默认使用 QoS 0** // 如果需要 QoS 1,需要使用 AsyncMqttClient 或其他支持 QoS 的库}注意:PubSubClient 库默认只支持 QoS 0。如果需要 QoS 1,推荐使用
AsyncMqttClient或MQTT库。
使用 AsyncMqttClient 实现 QoS 1:
#include <AsyncMqttClient.h>
AsyncMqttClient mqttClient;
void connectToMQTT() { mqttClient.onConnect(onMqttConnect); mqttClient.onDisconnect(onMqttDisconnect); mqttClient.onPublish(onMqttPublish); mqttClient.connect(MQTT_BROKER, 1883);}
void onMqttConnect(bool sessionPresent) { Serial.println("Connected to MQTT");
// 使用 QoS 1 发布 mqttClient.publish( "devices/control", // topic 1, // QoS false, // retain "{\"cmd\":\"toggle\"}" // payload );}
// 发布完成回调void onMqttPublish(uint16_t packetId) { Serial.print("Publish confirmed, packet ID: "); Serial.println(packetId);}Node-RED Configuration
Section titled “Node-RED Configuration”MQTT Out 节点配置:├── Server: [MQTT Broker]├── Topic: devices/control├── QoS: 1 (At least once) ← 选择├── Retain: false└── Name: Device Control
MQTT In 节点配置:├── Server: [MQTT Broker]├── Topic: devices/status├── QoS: 1 (At least once)├── Output: a parsed JSON object└── Name: Device StatusMosquitto Command Line
Section titled “Mosquitto Command Line”# 发布 QoS 1 消息mosquitto_pub -h localhost -t "control/device1" \ -q 1 -m '{"power": "on"}'
# 订阅 QoS 1mosquitto_sub -h localhost -t "control/device1" \ -q 1 -v
# 查看 QoS 级别mosquitto_sub -h localhost -t "#" -v -q 1Performance Impact
Section titled “Performance Impact”QoS 1 Overhead
Section titled “QoS 1 Overhead”| 指标 | QoS 0 | QoS 1 | 相比 QoS 0 的变化 |
|---|---|---|---|
| 消息大小 | 最小 | +2 字节 | Packet ID 字段 |
| 网络往返 | 0 | 1 次确认 | 增加一次 TCP 往返 |
| 发布者状态 | 无 | 存储直到收到 PUBACK | 内存增加 |
| 消息延迟 | < 1ms | 1-2ms | 增加 1-2ms |
| 吞吐量 | ~50,000/s | ~20,000/s | 降低 60% |
| 带宽消耗 | 基准 | +300% | 含重试流量 |
验证 QoS 1 行为
Section titled “验证 QoS 1 行为”# 测试确认机制# 终端 1: 订阅(使用 -q 1)mosquitto_sub -h localhost -t "qos1/test" -q 1 -v
# 终端 2: 发布(使用 -q 1)mosquitto_pub -h localhost -t "qos1/test" -q 1 -m "QoS 1 test"
# 查看 Broker 日志验证 PUBACKdocker logs mosquitto --tail 20# 应看到类似:# New client connected as ...# Received PUBLISH from ... (d0, q1, r0, m1, ...)# Sending PUBACK to ... (m1)- ✅ 推荐: 控制命令和告警使用 QoS 1
- ✅ 推荐: 在应用层实现去重逻辑(如果重复会造成问题)
- ✅ 推荐: 设置合理的重试间隔(建议 5-10 秒)
- ❌ 避免: 对不可重复执行的操作使用 QoS 1(如”加一”操作)
- ❌ 避免: 高频发布的场景使用 QoS 1(传感器数据用 QoS 0)
- ❌ 避免: 无限重试(设置最大重试次数)
Summary
Section titled “Summary”本节要点总结:
- QoS 1 机制:发送 → 等待 PUBACK → 未确认则重试
- 可靠性:至少一次送达,可能重复
- 性能影响:相比 QoS 0,吞吐量降低约 60%,延迟增加 1-2ms
- 适用场景:控制命令、告警通知、设备状态更新
- 去重要求:应用层需要考虑消息重复的可能性