MQTT In/Out 节点
MQTT In/Out 节点
本节介绍 Node-RED 的 MQTT 输入和输出节点。学习完成后,您将能够:
- 使用 MQTT In 节点订阅传感器数据
- 使用 MQTT Out 节点控制设备
- 配置 QoS、Topic 等核心参数
- 构建完整的 MQTT 收发 Flow
MQTT In Node
Section titled “MQTT In Node”MQTT In 节点订阅 MQTT Topic,接收消息:
[Mosquitto Broker] ──→ [MQTT In] ──→ [Function/Other Nodes]| 参数 | 说明 | 推荐值 |
|---|---|---|
| Topic | 订阅的主题 | sensor/# |
| QoS | 服务质量等级 | 0/1/2 |
| Output | 输出格式 | Auto/JSON/Buffer/String |
| Broker | 选择的 MQTT Broker | (已配置) |
| Name | 节点名称 | Temperature Sensor |
Topic 配置示例
Section titled “Topic 配置示例”# 单 Topictopic: "sensor/temperature"
# 通配符topic: "sensor/#" # 所有 sensor/ 下主题topic: "factory/+/temperature" # 单级通配符
# 多 Topic (用逗号分隔)topic: "sensor/temperature,sensor/humidity"MQTT Out Node
Section titled “MQTT Out Node”MQTT Out 节点发布消息到 MQTT Topic:
[Function/Other Nodes] ──→ [MQTT Out] ──→ [Mosquitto Broker]| 参数 | 说明 | 推荐值 |
|---|---|---|
| Topic | 发布的主题 | device/control |
| QoS | 服务质量等级 | 1 |
| Retain | 保留消息 | false |
| Broker | 选择的 MQTT Broker | (已配置) |
| Name | 节点名称 | Control Command |
Basic Usage
Section titled “Basic Usage”接收传感器数据
Section titled “接收传感器数据”[MQTT In: sensor/#] → [Function: 解析数据] → [InfluxDB Out: 存储] ↓ [Debug: 监控]// Function 节点: 解析传感器数据var sensorData = { device: msg.topic.split('/')[1], value: parseFloat(msg.payload), timestamp: Date.now()};
msg.payload = sensorData;return msg;发送控制命令
Section titled “发送控制命令”[Inject: 触发] → [Function: 构建命令] → [MQTT Out: device/control]// Function 节点: 构建控制命令msg.payload = { command: "cooling_on", target_temp: 25, duration: 3600, timestamp: Date.now()};
msg.topic = "factory/device/AC-01/command";msg.qos = 1;msg.retain = false;
return msg;Advanced Usage
Section titled “Advanced Usage”双向通信 Flow
Section titled “双向通信 Flow”完整的设备双向通信示例:
┌─────────────────────────────────────────────────────────────────┐│ Node-RED Flow │├─────────────────────────────────────────────────────────────────┤│ ││ 数据接收: ││ [MQTT In: device/+/data] → [Function: 处理] → [InfluxDB Out] ││ ││ 命令发送: ││ [Inject: 触发] → [Function: 构建] → [MQTT Out: device/+/cmd] ││ ││ 状态确认: ││ [MQTT In: device/+/status] → [Function: 验证] → [Debug: 结果] ││ │└─────────────────────────────────────────────────────────────────┘// 发送命令并等待确认// 步骤 1: 发送命令var commandMsg = { topic: "device/" + deviceId + "/command", payload: JSON.stringify({ cmd: "reboot", timestamp: Date.now() }), qos: 1, retain: false};
// 步骤 2: 保存到 context 用于确认var pendingCommands = context.get("pendingCommands") || {};pendingCommands[deviceId] = { command: "reboot", sentAt: Date.now(), status: "pending"};context.set("pendingCommands", pendingCommands);
node.send(commandMsg);Topic 动态配置
Section titled “Topic 动态配置”// 在 Function 节点中动态设置 Topicvar deviceId = msg.payload.device || "unknown";
// 设置发布 Topicmsg.topic = "factory/" + deviceId + "/command";
// 设置订阅 Topic(通过后续 MQTT In 节点)// 建议使用通配符订阅固定前缀
// 发送控制消息msg.payload = JSON.stringify({ relay: msg.payload.relay, state: msg.payload.state, timestamp: Date.now()});
return msg;QoS Configuration
Section titled “QoS Configuration”| QoS | 说明 | 可靠性 | 性能 | 推荐场景 |
|---|---|---|---|---|
| 0 | 最多一次 | 最低 | 最快 | 高频传感器数据 |
| 1 | 至少一次 | 中等 | 中等 | 设备控制指令 |
| 2 | 恰好一次 | 最高 | 最慢 | 关键告警消息 |
QoS 选择建议
Section titled “QoS 选择建议”// 传感器数据 (高频,允许丢失)msg.qos = 0;msg.topic = "sensor/temperature";
// 控制指令 (必须到达)msg.qos = 1;msg.topic = "device/AC-01/command";msg.retain = false;
// 告警消息 (必须且仅需一次)msg.qos = 2;msg.topic = "alarm/critical";msg.retain = true;Retained Messages
Section titled “Retained Messages”Retained 消息保留最新值,新订阅者立即收到:
// 发布保留消息msg.topic = "device/status";msg.payload = JSON.stringify({ device: "SENSOR-01", status: "online", lastSeen: Date.now()});msg.retain = true;
return msg;
// 新订阅者连接后立即收到保留消息// 无需等待设备重新发送Common Patterns
Section titled “Common Patterns”Pattern 1: MQTT 桥接
Section titled “Pattern 1: MQTT 桥接”// 在两个 Broker 之间转发消息// MQTT In (Broker A) → Function → MQTT Out (Broker B)
// Function: 转发逻辑if (msg.payload && msg.topic) { // 转发到另一个 Broker // 修改 Topic 前缀 msg.topic = "bridge/" + msg.topic; return msg;}return null;Pattern 2: 消息过滤
Section titled “Pattern 2: 消息过滤”// 只转发特定类型的消息var topic = msg.topic;var payload = msg.payload;
// 过滤: 只转发温度数据if (topic.includes("temperature")) { msg.qos = 0; return msg;}
// 报警数据立即转发if (payload > 40) { msg.qos = 1; msg.topic = "alert/" + topic; return msg;}
return null; // 丢弃其他消息✅ 推荐做法:
- 使用有意义的 Topic 结构(如
device/{id}/{type}) - 传感器数据使用 QoS 0,控制命令使用 QoS 1
- 设备状态使用 Retained 消息
- MQTT 输出节点前用 Function 节点验证数据
- 使用 Debug 节点监控 MQTT 通信
❌ 避免做法:
- Topic 使用无意义的命名
- 控制命令使用 QoS 0(可能丢失)
- 高频数据使用 Retained(增加 Broker 负载)
- 大型 payload 通过 MQTT 传输
- 订阅过于宽泛的 Topic
Summary
Section titled “Summary”- MQTT In 节点订阅接收传感器数据
- MQTT Out 节点发送控制命令
- QoS 0/1/2 满足不同可靠性需求
- Retained 消息提供设备状态快照
- 动态 Topic 实现灵活的设备路由