跳转到内容

Node-RED Flow 设计

Node-RED Flow 设计

本节介绍如何在 Node-RED 中设计数据处理 Flow,包括 MQTT 输入、数据解析、处理逻辑和输出。学习完成后,您将能够:

  • 创建完整的 MQTT 数据接收 Flow
  • 使用 Function 节点解析和转换数据
  • 实现迟滞控制算法
  • 组织和导出可复用的 Flow

在开始本节之前,请确保:

  • Node-RED 已安装并运行
  • MQTT Broker 已配置
  • ESP32 已开始通过 MQTT 发送数据
  • 了解 Node-RED 基本操作
[MQTT In] → [JSON Parse] → [Switch] → [InfluxDB Out]
| | |
| | +→ [Hysteresis] → [MQTT Out] (风扇控制)
| | |
| | +→ [Alert Check] → [Notification]
| |
+→ [Function] → [Debug] (调试输出)

将 Flow 分为多个功能组,便于管理:

┌─ Group 1: Data Reception ──────────────────┐
│ [mqtt in] → [json] → [switch] │
│ 接收原始数据 解析JSON 分流到各处理模块 │
└────────────────────────────────────────────┘
┌─ Group 2: Data Storage ────────────────────┐
│ [influxdb out] │
│ 存储到 InfluxDB 时序数据库 │
└────────────────────────────────────────────┘
┌─ Group 3: Fan Control ─────────────────────┐
│ [hysteresis] → [mqtt out] │
│ 迟滞控制器 发送控制命令到 ESP32 │
└────────────────────────────────────────────┘
┌─ Group 4: Alert System ────────────────────┐
│ [switch] → [function] → [http request] │
│ 阈值判断 格式化消息 发送通知 │
└────────────────────────────────────────────┘

添加 MQTT 输入节点接收传感器数据:

1. 从左侧节点面板拖入 "mqtt in" 节点
2. 双击节点进行配置:
- Server: 选择已配置的 MQTT Broker
- Topic: factory/zone1/environment
- QoS: 1
- Output: a parsed JSON object
3. 点击 "Done" 完成配置

添加 JSON 节点解析 MQTT 消息:

1. 拖入 "json" 节点
2. 双击配置:
- Action: Convert between JSON string & Object
3. 连接 mqtt in 节点的输出到 json 节点的输入

添加 Function 节点处理解析后的数据:

// Function 节点:数据处理和增强
// 输入: msg.payload = {device_id, temperature, humidity, lux}
// 输出: msg.payload = 增强后的数据对象
// 获取原始数据
var data = msg.payload;
// 添加处理时间戳
data.processed_at = Date.now();
// 添加数据质量标记
var quality = "good";
if (data.quality && data.quality.signal_rssi < -85) {
quality = "poor";
}
data.data_quality = quality;
// 计算体感温度 (简化公式)
if (data.temperature && data.humidity) {
var t = data.temperature.value;
var h = data.humidity.value;
// 简化 Heat Index 计算
data.feels_like = {
value: t + 0.1 * (h - 50) * 0.1,
unit: "celsius"
};
}
// 日志输出
node.log("Processed data from: " + data.device_id);
return msg;

使用 Switch 节点根据条件分流数据:

msg.payload.temperature.value
// Switch 节点配置
// Condition: >= 30 (高温告警)
// Output 1: 正常数据处理
// Output 2: 高温告警处理
// 或者使用多个规则:
// Rule 1: msg.payload.temperature.value >= 30 → Output 1 (告警)
// Rule 2: otherwise → Output 2 (正常)

迟滞控制是一种两点控制器,避免系统在阈值附近频繁切换:

安装 hysteresis 节点:
1. 点击右上角菜单 → Manage Palette
2. 选择 Install 标签
3. 搜索 "node-red-contrib-hysteresis"
4. 点击 Install

Hysteresis 节点配置

双击 hysteresis 节点:
- Upper threshold: 29 (温度超过 29°C 开启风扇)
- Lower threshold: 27 (温度低于 27°C 关闭风扇)
- Output on high (above upper): true
- Output on low (below lower): false
- Input field: payload
- Output field: payload

迟滞控制原理

温度 (°C)
30 │ ░░░░░░
29 ├─────────░──↑─── 上限阈值 (风扇开启)
28 │ ░ ░
27 ├─────────↓──░─── 下限阈值 (风扇关闭)
26 │ ░░░░░░ ░
└────────────────────→ 时间
风扇关闭 风扇开启
1. 拖入 "mqtt out" 节点
2. 配置:
- Server: 选择 MQTT Broker
- Topic: factory/zone1/environment/control
- QoS: 1
- Retain: false
3. 连接到 hysteresis 节点的输出

添加 Debug 节点监控各个处理阶段:

1. 在关键位置添加 Debug 节点:
- MQTT 输入后:查看原始数据
- JSON 解析后:验证解析结果
- Function 节点后:检查处理后的数据
- Hysteresis 后:监控控制信号
2. 配置 Debug 输出:
- Output: complete msg object
- 或: selected property

以下是可以直接导入 Node-RED 的完整 Flow:

[
{
"id": "mqtt_in_1",
"type": "mqtt in",
"z": "flow1",
"name": "Environment Data",
"topic": "factory/zone1/environment",
"qos": "1",
"broker": "broker_1",
"x": 120,
"y": 100,
"wires": [["json_1"]]
},
{
"id": "json_1",
"type": "json",
"z": "flow1",
"name": "Parse JSON",
"property": "payload",
"action": "obj",
"x": 290,
"y": 100,
"wires": [["function_1", "debug_1"]]
},
{
"id": "function_1",
"type": "function",
"z": "flow1",
"name": "Process Data",
"func": "// 增强数据处理\nvar data = msg.payload;\ndata.processed_at = Date.now();\ndata.data_quality = \"good\";\nreturn msg;",
"outputs": 1,
"x": 470,
"y": 100,
"wires": [["switch_1"]]
},
{
"id": "switch_1",
"type": "switch",
"z": "flow1",
"name": "Route by Temp",
"property": "payload.temperature.value",
"rules": [
{ "t": "gte", "v": "30", "checkall": "true" }
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 650,
"y": 100,
"wires": [["hysteresis_1"], ["influx_out_1"]]
},
{
"id": "hysteresis_1",
"type": "hysteresis",
"z": "flow1",
"name": "Fan Hysteresis",
"property": "payload",
"upper": 29,
"lower": 27,
"outputs": 1,
"x": 830,
"y": 80,
"wires": [["mqtt_out_1"]]
},
{
"id": "mqtt_out_1",
"type": "mqtt out",
"z": "flow1",
"name": "Fan Control",
"topic": "factory/zone1/environment/control",
"qos": "1",
"retain": "false",
"broker": "broker_1",
"x": 1010,
"y": 80,
"wires": []
},
{
"id": "influx_out_1",
"type": "influxdb out",
"z": "flow1",
"name": "Store to InfluxDB",
"influxdb": "influx_1",
"measurement": "environment",
"x": 1010,
"y": 140,
"wires": []
},
{
"id": "debug_1",
"type": "debug",
"z": "flow1",
"name": "Raw Data",
"active": true,
"tosidebar": true,
"console": false,
"x": 470,
"y": 160,
"wires": []
},
{
"id": "broker_1",
"type": "mqtt-broker",
"name": "Local MQTT",
"broker": "mosquitto",
"port": "1883",
"clientid": "",
"usetls": false,
"compatmode": true
},
{
"id": "influx_1",
"type": "influxdb",
"name": "InfluxDB",
"host": "influxdb",
"port": "8086",
"protocol": "http",
"database": "nodered",
"org": "organization",
"token": "",
"version": "2.0"
}
]
1. 部署 Flow (点击右上角 Deploy 按钮)
2. 打开 Debug 侧边栏 (右侧虫子图标)
3. 检查 Debug 输出是否显示传感器数据
4. 验证数据流转完整:
- mqtt in → json → function → switch → multiple outputs

如果 ESP32 不在线,可使用 Inject 节点模拟数据:

// Inject 节点中设置:
// Payload: JSON Object
// {
// "device_id": "ESP32_001",
// "temperature": {"value": 28.5, "unit": "celsius"},
// "humidity": {"value": 65, "unit": "percent"},
// "lux": {"value": 450, "unit": "lux"}
// }
  • MQTT 节点显示已连接(小圆点绿色)
  • Debug 输出显示原始传感器数据
  • JSON 解析成功,无错误输出
  • Function 节点正确处理数据
  • Switch 节点根据条件正确路由
  • Hysteresis 节点在阈值切换时正确输出
  • MQTT 输出节点发送控制命令到 ESP32
  • 推荐: 每个功能模块添加 Debug 节点方便排查问题
  • 推荐: 使用 Link In/Out 节点跨标签页连接 Flow
  • 推荐: 导出 Flow 时添加注释和分组说明
  • 避免: 在 Function 节点中使用无限循环或长时间运行的操作
  • 避免: 多个 Flow 直接修改同一个上下文变量引发竞态
  • 避免: 生产环境中保留过多的 Debug 节点(影响性能)

本节要点总结:

  1. Flow 架构:MQTT 接收 → JSON 解析 → 数据处理 → 分流输出
  2. 迟滞控制:Hysteresis 节点实现两点控制器,避免频繁切换
  3. 数据路由:Switch 节点根据条件(如温度阈值)分流数据
  4. 完整链路:从传感器数据采集到存储、控制、告警的一站式 Flow
  5. 可复用:导出 Flow JSON 可在不同项目间共享和重用