跳转到内容

Node-RED 控制流程设计

Node-RED 控制流程设计

本节介绍如何在 Node-RED 中设计完整的智能继电器控制 Flow。学习完成后,您将能够:

  • 设计 MQTT 控制命令的发送 Flow
  • 实现 Tasmota 和 Shelly 设备的开关控制
  • 创建带有定时和条件逻辑的自动化控制
  • 构建设备状态反馈回路

最简单的控制流:

[Inject: ON] ──→ [MQTT Out: cmnd/tasmota/socket1/POWER]
[Inject: OFF] ──→ [MQTT Out: cmnd/tasmota/socket1/POWER]
// Inject 节点 1 (ON)
{
"name": "Turn ON",
"payload": "ON",
"payloadType": "str",
"repeat": "",
"cron": ""
}
// Inject 节点 2 (OFF)
{
"name": "Turn OFF",
"payload": "OFF",
"payloadType": "str",
"repeat": "",
"cron": ""
}
// MQTT Out 节点
{
"name": "Tasmota Power",
"topic": "cmnd/tasmota/socket1/POWER",
"qos": 1,
"retain": false
}
[Inject] ──→ [Function: RPC 命令] ──→ [MQTT Out: shellyplus/rpc]
[MQTT In: nodered/rpc] ←─ 响应
// Function: 构建 Shelly RPC 控制命令
var action = msg.payload; // true = 开, false = 关
// 生成唯一请求 ID
var requestId = Math.floor(Math.random() * 100000);
msg.payload = {
id: requestId,
src: "nodered",
method: "Switch.Set",
params: {
id: 0,
on: action
}
};
// 保存到 Context 用于匹配响应
var pendingRequests = context.get("pendingRequests") || {};
pendingRequests[requestId] = {
action: action,
timestamp: Date.now()
};
context.set("pendingRequests", pendingRequests);
// 设置 Topic
msg.topic = "shellyplus1pm-ABC123/rpc";
return msg;
// Function: 基于时间的控制逻辑
// 在指定时间自动开关设备
var now = new Date();
var hour = now.getHours();
var minute = now.getMinutes();
// 工作时间 (09:00 - 18:00) 开启
// 非工作时间关闭
var isWorkTime = (hour >= 9 && hour < 18);
var currentState = msg.payload.output || false;
var deviceId = "shellyplus1pm-ABC123";
// 只在状态需要变化时执行
if (isWorkTime && !currentState) {
// 需要开启
msg.payload = {
id: 1,
src: "nodered",
method: "Switch.Set",
params: { id: 0, on: true }
};
msg.topic = deviceId + "/rpc";
return msg;
} else if (!isWorkTime && currentState) {
// 需要关闭
msg.payload = {
id: 2,
src: "nodered",
method: "Switch.Set",
params: { id: 0, on: false }
};
msg.topic = deviceId + "/rpc";
return msg;
}
// 状态无需变化,不发送命令
return null;
// Function: 当功率过高时自动关闭
// 输入: 来自能耗监测 Flow 的数据
var power = msg.payload.power;
var deviceId = msg.payload.device || "unknown";
// 功率阈值
var POWER_LIMIT = 2000; // 2000W
if (power > POWER_LIMIT) {
// 超过阈值,发送关闭命令
node.warn("Power limit exceeded: " + power + "W on " + deviceId);
// 构建 RPC 关闭命令
msg.payload = {
id: Date.now(),
src: "nodered",
method: "Switch.Set",
params: {
id: 0,
on: false
}
};
msg.topic = deviceId + "/rpc";
// 标记为告警
msg.alert = {
type: "power_limit",
device: deviceId,
power: power,
limit: POWER_LIMIT,
timestamp: Date.now()
};
return msg;
}
return null; // 功率正常,不处理
// Function: 批量控制 Flow 的输出
// 一个 Inject 触发多个设备
var action = msg.payload; // true/false
// 设备列表
var devices = [
"shellyplus1pm-LINE1",
"shellyplus1pm-LINE2",
"shellyplus1pm-LIGHTING"
];
var requestId = Date.now();
var messages = [];
devices.forEach(function(deviceId, index) {
messages.push({
topic: deviceId + "/rpc",
payload: {
id: requestId + index,
src: "nodered/batch",
method: "Switch.Set",
params: {
id: 0,
on: action
}
}
});
});
// 发送多条消息
return [messages];

完整控制流程应包含状态确认:

┌───────────────────────────────────────────────┐
│ 控制流程 │
├───────────────────────────────────────────────┤
│ │
│ 1. [Inject/Trigger] │
│ │ │
│ ▼ │
│ 2. [Function: 构建命令] │
│ │ │
│ ├──→ [MQTT Out: 发送到设备] │
│ │ │
│ ▼ │
│ 3. [MQTT In: 订阅设备反馈] │
│ │ │
│ ▼ │
│ 4. [Function: 验证执行结果] │
│ │ │
│ ├──→ [Debug: 成功日志] │
│ │ │
│ └──→ [Function: 重试/告警] (失败时) │
│ │
└───────────────────────────────────────────────┘
// Function: 验证命令执行结果
// 输入: Shelly Gen2 RPC 响应
var result = msg.payload;
if (result && result.result) {
var wasOn = result.result.was_on;
var currentState = msg.payload.output;
node.status({
fill: currentState ? "green" : "red",
shape: "dot",
text: currentState ? "ON" : "OFF"
});
msg.payload = {
device: msg.topic.split('/')[0],
command: "Switch.Set",
success: true,
previous_state: wasOn,
current_state: currentState,
timestamp: Date.now()
};
return msg;
}
// 执行失败,触发告警
msg.payload = {
device: msg.topic.split('/')[0],
command: "Switch.Set",
success: false,
error: "No response from device",
timestamp: Date.now()
};
return msg;
Terminal window
# 1. 测试 Tasmota 开关控制
mosquitto_pub -t "cmnd/tasmota/socket1/POWER" -m "ON"
mosquitto_sub -t "stat/tasmota/socket1/POWER" -C 1
# 预期: ON
# 2. 测试 Shelly RPC
mosquitto_pub -t "shellyplus1pm-ABC123/rpc" -m '{"id":1,"src":"test","method":"Switch.Set","params":{"id":0,"on":true}}'
mosquitto_sub -t "test/rpc" -C 1
# 预期: {"id":1,"src":"shellyplus1pm-ABC123","result":{"was_on":false,"restart_required":false}}

Q1: 如何确保控制命令被设备正确执行?

Section titled “Q1: 如何确保控制命令被设备正确执行?”

建议实现控制-确认的双向通信:发送命令后,订阅设备的状态反馈 Topic,在 Function 节点中验证执行结果。如果超时未收到确认,触发重试或告警。

在 Node-RED 中设置超时检测:

// 发送命令后保存到 Context
// 使用定时器检查是否在 5 秒内收到响应

推荐做法:

  • 控制命令使用 QoS 1 确保至少一次投递
  • 实现命令-确认的双向通信模式
  • 使用 Context 管理待确认的命令
  • 批量控制时使用 Promise.all 模式

避免做法:

  • 不验证命令执行结果
  • 短时间内频繁发送控制命令
  • 忽略 Qos 设置(默认 QoS 0 可能丢失命令)
  • 控制逻辑和数据处理 Mix 在同一个 Flow
  1. Tasmota 控制通过 cmnd/{topic}/POWER Topic 发送 ON/OFF
  2. Shelly Gen2 控制通过 RPC over MQTT 的 JSON-RPC 协议
  3. 定时控制结合 Inject 节点的 cron 功能实现
  4. 条件控制基于能耗数据自动决策
  5. 状态确认通过反馈 Topic 验证命令执行结果