Node-RED Flow 设计
Node-RED Flow 设计
本节介绍如何在 Node-RED 中设计数据处理 Flow,包括 MQTT 输入、数据解析、处理逻辑和输出。学习完成后,您将能够:
- 创建完整的 MQTT 数据接收 Flow
- 使用 Function 节点解析和转换数据
- 实现迟滞控制算法
- 组织和导出可复用的 Flow
在开始本节之前,请确保:
- Node-RED 已安装并运行
- MQTT Broker 已配置
- ESP32 已开始通过 MQTT 发送数据
- 了解 Node-RED 基本操作
Flow Architecture
Section titled “Flow Architecture”Overall Flow Design
Section titled “Overall Flow Design”[MQTT In] → [JSON Parse] → [Switch] → [InfluxDB Out] | | | | | +→ [Hysteresis] → [MQTT Out] (风扇控制) | | | | | +→ [Alert Check] → [Notification] | | +→ [Function] → [Debug] (调试输出)Node Grouping
Section titled “Node Grouping”将 Flow 分为多个功能组,便于管理:
┌─ Group 1: Data Reception ──────────────────┐│ [mqtt in] → [json] → [switch] ││ 接收原始数据 解析JSON 分流到各处理模块 │└────────────────────────────────────────────┘
┌─ Group 2: Data Storage ────────────────────┐│ [influxdb out] ││ 存储到 InfluxDB 时序数据库 │└────────────────────────────────────────────┘
┌─ Group 3: Fan Control ─────────────────────┐│ [hysteresis] → [mqtt out] ││ 迟滞控制器 发送控制命令到 ESP32 │└────────────────────────────────────────────┘
┌─ Group 4: Alert System ────────────────────┐│ [switch] → [function] → [http request] ││ 阈值判断 格式化消息 发送通知 │└────────────────────────────────────────────┘Step-by-Step Flow Creation
Section titled “Step-by-Step Flow Creation”Step 1: MQTT Input Node
Section titled “Step 1: MQTT Input Node”添加 MQTT 输入节点接收传感器数据:
1. 从左侧节点面板拖入 "mqtt in" 节点2. 双击节点进行配置: - Server: 选择已配置的 MQTT Broker - Topic: factory/zone1/environment - QoS: 1 - Output: a parsed JSON object3. 点击 "Done" 完成配置Step 2: JSON Parsing
Section titled “Step 2: JSON Parsing”添加 JSON 节点解析 MQTT 消息:
1. 拖入 "json" 节点2. 双击配置: - Action: Convert between JSON string & Object3. 连接 mqtt in 节点的输出到 json 节点的输入Step 3: Function Node - Data Processing
Section titled “Step 3: Function Node - Data Processing”添加 Function 节点处理解析后的数据:
// Function 节点:数据处理和增强// 输入: msg.payload = {device_id, temperature, humidity, lux}// 输出: msg.payload = 增强后的数据对象
// 获取原始数据var data = msg.payload;
// 添加处理时间戳data.processed_at = Date.now();
// 添加数据质量标记var quality = "good";if (data.quality && data.quality.signal_rssi < -85) { quality = "poor";}data.data_quality = quality;
// 计算体感温度 (简化公式)if (data.temperature && data.humidity) { var t = data.temperature.value; var h = data.humidity.value; // 简化 Heat Index 计算 data.feels_like = { value: t + 0.1 * (h - 50) * 0.1, unit: "celsius" };}
// 日志输出node.log("Processed data from: " + data.device_id);
return msg;Step 4: Switch Node - Data Routing
Section titled “Step 4: Switch Node - Data Routing”使用 Switch 节点根据条件分流数据:
// Switch 节点配置// Condition: >= 30 (高温告警)// Output 1: 正常数据处理// Output 2: 高温告警处理
// 或者使用多个规则:// Rule 1: msg.payload.temperature.value >= 30 → Output 1 (告警)// Rule 2: otherwise → Output 2 (正常)Step 5: Hysteresis Node
Section titled “Step 5: Hysteresis Node”迟滞控制是一种两点控制器,避免系统在阈值附近频繁切换:
安装 hysteresis 节点:1. 点击右上角菜单 → Manage Palette2. 选择 Install 标签3. 搜索 "node-red-contrib-hysteresis"4. 点击 InstallHysteresis 节点配置:
双击 hysteresis 节点:- Upper threshold: 29 (温度超过 29°C 开启风扇)- Lower threshold: 27 (温度低于 27°C 关闭风扇)- Output on high (above upper): true- Output on low (below lower): false- Input field: payload- Output field: payload迟滞控制原理:
温度 (°C) ↑30 │ ░░░░░░29 ├─────────░──↑─── 上限阈值 (风扇开启)28 │ ░ ░27 ├─────────↓──░─── 下限阈值 (风扇关闭)26 │ ░░░░░░ ░ └────────────────────→ 时间 风扇关闭 风扇开启Step 6: MQTT Output - Fan Control
Section titled “Step 6: MQTT Output - Fan Control”1. 拖入 "mqtt out" 节点2. 配置: - Server: 选择 MQTT Broker - Topic: factory/zone1/environment/control - QoS: 1 - Retain: false3. 连接到 hysteresis 节点的输出Step 7: Debug Nodes
Section titled “Step 7: Debug Nodes”添加 Debug 节点监控各个处理阶段:
1. 在关键位置添加 Debug 节点: - MQTT 输入后:查看原始数据 - JSON 解析后:验证解析结果 - Function 节点后:检查处理后的数据 - Hysteresis 后:监控控制信号2. 配置 Debug 输出: - Output: complete msg object - 或: selected propertyComplete Flow Example
Section titled “Complete Flow Example”Flow JSON Export
Section titled “Flow JSON Export”以下是可以直接导入 Node-RED 的完整 Flow:
[ { "id": "mqtt_in_1", "type": "mqtt in", "z": "flow1", "name": "Environment Data", "topic": "factory/zone1/environment", "qos": "1", "broker": "broker_1", "x": 120, "y": 100, "wires": [["json_1"]] }, { "id": "json_1", "type": "json", "z": "flow1", "name": "Parse JSON", "property": "payload", "action": "obj", "x": 290, "y": 100, "wires": [["function_1", "debug_1"]] }, { "id": "function_1", "type": "function", "z": "flow1", "name": "Process Data", "func": "// 增强数据处理\nvar data = msg.payload;\ndata.processed_at = Date.now();\ndata.data_quality = \"good\";\nreturn msg;", "outputs": 1, "x": 470, "y": 100, "wires": [["switch_1"]] }, { "id": "switch_1", "type": "switch", "z": "flow1", "name": "Route by Temp", "property": "payload.temperature.value", "rules": [ { "t": "gte", "v": "30", "checkall": "true" } ], "checkall": "true", "repair": false, "outputs": 2, "x": 650, "y": 100, "wires": [["hysteresis_1"], ["influx_out_1"]] }, { "id": "hysteresis_1", "type": "hysteresis", "z": "flow1", "name": "Fan Hysteresis", "property": "payload", "upper": 29, "lower": 27, "outputs": 1, "x": 830, "y": 80, "wires": [["mqtt_out_1"]] }, { "id": "mqtt_out_1", "type": "mqtt out", "z": "flow1", "name": "Fan Control", "topic": "factory/zone1/environment/control", "qos": "1", "retain": "false", "broker": "broker_1", "x": 1010, "y": 80, "wires": [] }, { "id": "influx_out_1", "type": "influxdb out", "z": "flow1", "name": "Store to InfluxDB", "influxdb": "influx_1", "measurement": "environment", "x": 1010, "y": 140, "wires": [] }, { "id": "debug_1", "type": "debug", "z": "flow1", "name": "Raw Data", "active": true, "tosidebar": true, "console": false, "x": 470, "y": 160, "wires": [] }, { "id": "broker_1", "type": "mqtt-broker", "name": "Local MQTT", "broker": "mosquitto", "port": "1883", "clientid": "", "usetls": false, "compatmode": true }, { "id": "influx_1", "type": "influxdb", "name": "InfluxDB", "host": "influxdb", "port": "8086", "protocol": "http", "database": "nodered", "org": "organization", "token": "", "version": "2.0" }]Flow Testing
Section titled “Flow Testing”1. 部署 Flow (点击右上角 Deploy 按钮)2. 打开 Debug 侧边栏 (右侧虫子图标)3. 检查 Debug 输出是否显示传感器数据4. 验证数据流转完整: - mqtt in → json → function → switch → multiple outputs模拟数据测试
Section titled “模拟数据测试”如果 ESP32 不在线,可使用 Inject 节点模拟数据:
// Inject 节点中设置:// Payload: JSON Object// {// "device_id": "ESP32_001",// "temperature": {"value": 28.5, "unit": "celsius"},// "humidity": {"value": 65, "unit": "percent"},// "lux": {"value": 450, "unit": "lux"}// }Flow 验证检查清单
Section titled “Flow 验证检查清单”- MQTT 节点显示已连接(小圆点绿色)
- Debug 输出显示原始传感器数据
- JSON 解析成功,无错误输出
- Function 节点正确处理数据
- Switch 节点根据条件正确路由
- Hysteresis 节点在阈值切换时正确输出
- MQTT 输出节点发送控制命令到 ESP32
- ✅ 推荐: 每个功能模块添加 Debug 节点方便排查问题
- ✅ 推荐: 使用 Link In/Out 节点跨标签页连接 Flow
- ✅ 推荐: 导出 Flow 时添加注释和分组说明
- ❌ 避免: 在 Function 节点中使用无限循环或长时间运行的操作
- ❌ 避免: 多个 Flow 直接修改同一个上下文变量引发竞态
- ❌ 避免: 生产环境中保留过多的 Debug 节点(影响性能)
Summary
Section titled “Summary”本节要点总结:
- Flow 架构:MQTT 接收 → JSON 解析 → 数据处理 → 分流输出
- 迟滞控制:Hysteresis 节点实现两点控制器,避免频繁切换
- 数据路由:Switch 节点根据条件(如温度阈值)分流数据
- 完整链路:从传感器数据采集到存储、控制、告警的一站式 Flow
- 可复用:导出 Flow JSON 可在不同项目间共享和重用