跳转到内容

MQTT In/Out 节点

MQTT In/Out 节点

本节介绍 Node-RED 的 MQTT 输入和输出节点。学习完成后,您将能够:

  • 使用 MQTT In 节点订阅传感器数据
  • 使用 MQTT Out 节点控制设备
  • 配置 QoS、Topic 等核心参数
  • 构建完整的 MQTT 收发 Flow

MQTT In 节点订阅 MQTT Topic,接收消息:

[Mosquitto Broker] ──→ [MQTT In] ──→ [Function/Other Nodes]
参数说明推荐值
Topic订阅的主题sensor/#
QoS服务质量等级0/1/2
Output输出格式Auto/JSON/Buffer/String
Broker选择的 MQTT Broker(已配置)
Name节点名称Temperature Sensor
Terminal window
# 单 Topic
topic: "sensor/temperature"
# 通配符
topic: "sensor/#" # 所有 sensor/ 下主题
topic: "factory/+/temperature" # 单级通配符
# 多 Topic (用逗号分隔)
topic: "sensor/temperature,sensor/humidity"

MQTT Out 节点发布消息到 MQTT Topic:

[Function/Other Nodes] ──→ [MQTT Out] ──→ [Mosquitto Broker]
参数说明推荐值
Topic发布的主题device/control
QoS服务质量等级1
Retain保留消息false
Broker选择的 MQTT Broker(已配置)
Name节点名称Control Command
[MQTT In: sensor/#] → [Function: 解析数据] → [InfluxDB Out: 存储]
[Debug: 监控]
// Function 节点: 解析传感器数据
var sensorData = {
device: msg.topic.split('/')[1],
value: parseFloat(msg.payload),
timestamp: Date.now()
};
msg.payload = sensorData;
return msg;
[Inject: 触发] → [Function: 构建命令] → [MQTT Out: device/control]
// Function 节点: 构建控制命令
msg.payload = {
command: "cooling_on",
target_temp: 25,
duration: 3600,
timestamp: Date.now()
};
msg.topic = "factory/device/AC-01/command";
msg.qos = 1;
msg.retain = false;
return msg;

完整的设备双向通信示例:

┌─────────────────────────────────────────────────────────────────┐
│ Node-RED Flow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 数据接收: │
│ [MQTT In: device/+/data] → [Function: 处理] → [InfluxDB Out] │
│ │
│ 命令发送: │
│ [Inject: 触发] → [Function: 构建] → [MQTT Out: device/+/cmd] │
│ │
│ 状态确认: │
│ [MQTT In: device/+/status] → [Function: 验证] → [Debug: 结果] │
│ │
└─────────────────────────────────────────────────────────────────┘
// 发送命令并等待确认
// 步骤 1: 发送命令
var commandMsg = {
topic: "device/" + deviceId + "/command",
payload: JSON.stringify({
cmd: "reboot",
timestamp: Date.now()
}),
qos: 1,
retain: false
};
// 步骤 2: 保存到 context 用于确认
var pendingCommands = context.get("pendingCommands") || {};
pendingCommands[deviceId] = {
command: "reboot",
sentAt: Date.now(),
status: "pending"
};
context.set("pendingCommands", pendingCommands);
node.send(commandMsg);
// 在 Function 节点中动态设置 Topic
var deviceId = msg.payload.device || "unknown";
// 设置发布 Topic
msg.topic = "factory/" + deviceId + "/command";
// 设置订阅 Topic(通过后续 MQTT In 节点)
// 建议使用通配符订阅固定前缀
// 发送控制消息
msg.payload = JSON.stringify({
relay: msg.payload.relay,
state: msg.payload.state,
timestamp: Date.now()
});
return msg;
QoS说明可靠性性能推荐场景
0最多一次最低最快高频传感器数据
1至少一次中等中等设备控制指令
2恰好一次最高最慢关键告警消息
// 传感器数据 (高频,允许丢失)
msg.qos = 0;
msg.topic = "sensor/temperature";
// 控制指令 (必须到达)
msg.qos = 1;
msg.topic = "device/AC-01/command";
msg.retain = false;
// 告警消息 (必须且仅需一次)
msg.qos = 2;
msg.topic = "alarm/critical";
msg.retain = true;

Retained 消息保留最新值,新订阅者立即收到:

// 发布保留消息
msg.topic = "device/status";
msg.payload = JSON.stringify({
device: "SENSOR-01",
status: "online",
lastSeen: Date.now()
});
msg.retain = true;
return msg;
// 新订阅者连接后立即收到保留消息
// 无需等待设备重新发送
// 在两个 Broker 之间转发消息
// MQTT In (Broker A) → Function → MQTT Out (Broker B)
// Function: 转发逻辑
if (msg.payload && msg.topic) {
// 转发到另一个 Broker
// 修改 Topic 前缀
msg.topic = "bridge/" + msg.topic;
return msg;
}
return null;
// 只转发特定类型的消息
var topic = msg.topic;
var payload = msg.payload;
// 过滤: 只转发温度数据
if (topic.includes("temperature")) {
msg.qos = 0;
return msg;
}
// 报警数据立即转发
if (payload > 40) {
msg.qos = 1;
msg.topic = "alert/" + topic;
return msg;
}
return null; // 丢弃其他消息

推荐做法:

  • 使用有意义的 Topic 结构(如 device/{id}/{type}
  • 传感器数据使用 QoS 0,控制命令使用 QoS 1
  • 设备状态使用 Retained 消息
  • MQTT 输出节点前用 Function 节点验证数据
  • 使用 Debug 节点监控 MQTT 通信

避免做法:

  • Topic 使用无意义的命名
  • 控制命令使用 QoS 0(可能丢失)
  • 高频数据使用 Retained(增加 Broker 负载)
  • 大型 payload 通过 MQTT 传输
  • 订阅过于宽泛的 Topic
  1. MQTT In 节点订阅接收传感器数据
  2. MQTT Out 节点发送控制命令
  3. QoS 0/1/2 满足不同可靠性需求
  4. Retained 消息提供设备状态快照
  5. 动态 Topic 实现灵活的设备路由