数据处理与转换
数据处理与转换
本节介绍如何在 Node-RED 中对传感器数据进行处理和转换,包括数据类型转换、单位换算、数据清洗和聚合。学习完成后,您将能够:
- 使用 Function 节点实现数据清洗和验证
- 实现不同数据格式之间的转换
- 配置数据聚合和降采样规则
- 处理传感器数据中的异常值和缺失值
在开始本节之前,请确保:
- Node-RED Flow 已基本搭建完成
- 了解 Function 节点的 JavaScript 编程
- 理解 InfluxDB 的 Line Protocol 数据格式
Data Processing Pipeline
Section titled “Data Processing Pipeline”Processing Flow
Section titled “Processing Flow”原始数据 → 数据清洗 → 格式转换 → 单位换算 → 聚合计算 → 存储/输出每个阶段的功能:
| 阶段 | 功能 | 说明 |
|---|---|---|
| 数据清洗 | 验证数据有效性 | 过滤 NaN、异常值、无效数据 |
| 格式转换 | 调整数据结构 | 从 MQTT JSON 转换为 InfluxDB Line Protocol |
| 单位换算 | 标准化度量单位 | 温度 °C ↔ °F,光照 lux 级别映射 |
| 聚合计算 | 统计汇总 | 均值、最大值、最小值、采样计数 |
| 数据增强 | 补充计算字段 | 体感温度、温湿度指数、热指数 |
Data Cleaning
Section titled “Data Cleaning”Function: Data Validator
Section titled “Function: Data Validator”// 数据验证 Function 节点// 输入: msg.payload 包含传感器数据// 输出: 验证通过的数据,或丢弃无效数据
var data = msg.payload;
// 验证数据是否完整if (!data.temperature || !data.humidity) { node.warn("Missing sensor data, discarding message"); return null; // 丢弃无效数据}
// 验证温度范围var temp = data.temperature.value;if (typeof temp !== 'number' || temp < -20 || temp > 60) { node.warn("Temperature out of range: " + temp); return null;}
// 验证湿度范围var hum = data.humidity.value;if (typeof hum !== 'number' || hum < 0 || hum > 100) { node.warn("Humidity out of range: " + hum); return null;}
// 验证光照范围var lux = data.lux ? data.lux.value : null;if (lux !== null && (typeof lux !== 'number' || lux < 0 || lux > 100000)) { node.warn("Light value out of range: " + lux); lux = null; // 标记异常光线值 data.lux.value = null;}
// 添加数据质量标记var signalStrength = (data.quality && data.quality.signal_rssi) ? data.quality.signal_rssi : -90;
data.data_quality = { rssi: signalStrength, valid: true, checked_at: Date.now()};
// 数据来源信息msg.source = "factory_environment_sensor";
node.log("Validated data from: " + data.device_id);return msg;Function: Outlier Filter
Section titled “Function: Outlier Filter”// 异常值过滤器// 使用滑动窗口检测异常跳变
// 初始化上下文存储(仅在首次运行)var context = flow.get("sensor_history") || { lastTemp: null, lastHum: null, maxDeltaTemp: 5, // 最大允许温度变化 ±5°C maxDeltaHum: 10 // 最大允许湿度变化 ±10%};
var data = msg.payload;var currentTemp = data.temperature.value;var currentHum = data.humidity.value;
// 检查温度跳变if (context.lastTemp !== null) { var tempDelta = Math.abs(currentTemp - context.lastTemp); if (tempDelta > context.maxDeltaTemp) { node.warn("Temperature spike detected: " + context.lastTemp + " → " + currentTemp + "°C"); // 使用前一个值替换(简单处理) // 更精确的做法可能是丢弃或用移动平均替换 data.temperature.value = context.lastTemp; data.filtered = true; }}
// 检查湿度跳变if (context.lastHum !== null) { var humDelta = Math.abs(currentHum - context.lastHum); if (humDelta > context.maxDeltaHum) { node.warn("Humidity spike detected: " + context.lastHum + " → " + currentHum + "%"); data.humidity.value = context.lastHum; data.filtered = true; }}
// 更新历史数据context.lastTemp = currentTemp;context.lastHum = currentHum;flow.set("sensor_history", context);
return msg;Unit Conversion
Section titled “Unit Conversion”Function: Temperature Unit Conversion
Section titled “Function: Temperature Unit Conversion”// 温度单位换算和增强var data = msg.payload;var tempCelsius = data.temperature.value;
// 转换为华氏度var tempFahrenheit = (tempCelsius * 9/5) + 32;
// 转换为开尔文var tempKelvin = tempCelsius + 273.15;
// 添加转换后的值data.temperature.fahrenheit = Math.round(tempFahrenheit * 10) / 10;data.temperature.kelvin = Math.round(tempKelvin * 10) / 10;
// 添加温度水平描述if (tempCelsius < 10) { data.temperature.level = "cold";} else if (tempCelsius < 20) { data.temperature.level = "cool";} else if (tempCelsius < 28) { data.temperature.level = "comfortable";} else if (tempCelsius < 35) { data.temperature.level = "warm";} else { data.temperature.level = "hot";}
return msg;Function: Light Level Classification
Section titled “Function: Light Level Classification”// 光照级别分类var data = msg.payload;var lux = data.lux ? data.lux.value : 0;
// 光照级别分类var lightLevel;if (lux < 1) lightLevel = "dark";else if (lux < 50) lightLevel = "dim";else if (lux < 200) lightLevel = "low";else if (lux < 500) lightLevel = "normal";else if (lux < 1000) lightLevel = "bright";else if (lux < 10000) lightLevel = "very_bright";else lightLevel = "direct_sunlight";
// 添加光照级别data.lux.level = lightLevel;data.lux.category = lux < 200 ? "insufficient" : "adequate";
// 工作区光照合规检查data.light_compliance = { standard: "ISO 8995", minimum: 300, current: lux, compliant: lux >= 300};
return msg;Data Aggregation
Section titled “Data Aggregation”Using Join Node for Aggregation
Section titled “Using Join Node for Aggregation”Node-RED 的 Join 节点可以对一段时间内的数据进行聚合:
Join 节点配置:- Mode: Time-based- Time: 60 seconds (每 60 秒聚合一次)- Count: unlimited- Timeout: 60 seconds- Output: - msg.payload = array of accumulated messagesFunction: Aggregation Processing
Section titled “Function: Aggregation Processing”// 接在 Join 节点后的聚合处理// 输入: msg.payload = [{传感器数据1}, {传感器数据2}, ...]
var samples = msg.payload;var count = samples.length;
if (count === 0) return null;
// 聚合计算var sumTemp = 0, sumHum = 0, sumLux = 0;var minTemp = Infinity, maxTemp = -Infinity;var validSamples = 0;
samples.forEach(function(sample) { if (sample.temperature && sample.humidity) { sumTemp += sample.temperature.value; sumHum += sample.humidity.value;
if (sample.lux) sumLux += sample.lux.value;
minTemp = Math.min(minTemp, sample.temperature.value); maxTemp = Math.max(maxTemp, sample.temperature.value); validSamples++; }});
// 构建聚合结果msg.payload = { device_id: samples[0].device_id, location: samples[0].location, period: { start: samples[0].processed_at || Date.now() - 60000, end: Date.now(), sample_count: count, valid_count: validSamples }, temperature: { avg: Math.round(sumTemp / validSamples * 10) / 10, min: Math.round(minTemp * 10) / 10, max: Math.round(maxTemp * 10) / 10, unit: "celsius" }, humidity: { avg: Math.round(sumHum / validSamples * 10) / 10, unit: "percent" }, lux: { avg: Math.round(sumLux / validSamples), unit: "lux" }};
return msg;InfluxDB Data Formatting
Section titled “InfluxDB Data Formatting”Function: Convert to InfluxDB Line Protocol
Section titled “Function: Convert to InfluxDB Line Protocol”// 转换为 InfluxDB 2.x Line Protocol 格式// 配合 InfluxDB Out 节点使用
var data = msg.payload;
// 构建 InfluxDB 数据点var influxPoint = { measurement: "environment", tags: { device_id: data.device_id || "unknown", location: data.location || "unknown", zone: "zone1", sensor_type: "environmental" }, fields: { temperature: data.temperature ? data.temperature.value : 0, humidity: data.humidity ? data.humidity.value : 0, lux: data.lux ? data.lux.value : 0, feels_like: data.feels_like ? data.feels_like.value : 0, signal_rssi: data.quality ? data.quality.signal_rssi : -100 }, timestamp: Date.now() * 1000000 // InfluxDB 使用纳秒时间戳};
// 可以直接使用 InfluxDB Out 节点,它会自动转换此格式// 或手动构建 Line Protocol 字符串var lineProtocol = influxPoint.measurement + "," + Object.entries(influxPoint.tags).map(function(tag) { return tag[0] + "=" + tag[1]; }).join(",") + " " + Object.entries(influxPoint.fields).map(function(field) { return field[0] + "=" + field[1]; }).join(",") + " " + influxPoint.timestamp;
msg.payload = influxPoint;
return msg;Data Enrichment
Section titled “Data Enrichment”Function: Add Derived Metrics
Section titled “Function: Add Derived Metrics”// 数据增强:添加衍生指标
var data = msg.payload;
// 计算温湿度指数 (THI) - 舒适度指标// THI = T - 0.55 × (1 - RH/100) × (T - 14.5)// 适用于温度超过 14.5°C 的场景if (data.temperature && data.humidity) { var t = data.temperature.value; var h = data.humidity.value;
var thi = t - 0.55 * (1 - h/100) * (t - 14.5);
data.derived = { thi: { value: Math.round(thi * 100) / 100, label: thi < 70 ? "comfortable" : thi < 75 ? "slightly_uncomfortable" : thi < 80 ? "moderately_uncomfortable" : "extremely_uncomfortable" }, dew_point: { value: Math.round((t - (100 - h) / 5) * 10) / 10, unit: "celsius" }, vapor_pressure: { value: Math.round(6.112 * Math.pow(Math.E, (17.67 * t) / (t + 243.5)) * (h/100) * 10) / 10, unit: "hPa" } };}
return msg;验证数据清洗
Section titled “验证数据清洗”1. 在 Function 节点后添加 Debug 节点2. 检查过滤后的数据: - 无效温度 (< -20°C 或 > 60°C) 是否被过滤 - 异常跳变是否被检测和替换 - 数据质量标记是否正确添加验证数据转换
Section titled “验证数据转换”1. 使用 Inject 节点发送模拟数据2. 验证转换结果: - 温度值单位换算是否正确 - 光照级别分类是否符合预期 - InfluxDB Line Protocol 格式是否正确Issue 1: 聚合数据丢失
Section titled “Issue 1: 聚合数据丢失”症状:Join 节点在一段时间后输出为 null
可能原因:
- 时间窗口超出传感器发送间隔
- 传感器停止发送数据
解决方案:
- 设置合理的超时值(大于传感器发送间隔)
- 如果传感器超时未发送数据,发送默认值
Issue 2: 数据类型错误
Section titled “Issue 2: 数据类型错误”症状:InfluxDB 写入失败,提示字段类型不匹配
可能原因:
- 字符串类型错误地写入了数值字段
- JSON 解析后数据类型不符合预期
解决方案:
- 使用
Number()强制转换数值类型 - 使用
typeof检查数据类型后再处理
- ✅ 推荐: 在入口处进行数据验证,尽早过滤无效数据
- ✅ 推荐: 使用 Flow 上下文存储历史数据用于异常检测
- ✅ 推荐: 聚合时包含采样计数,帮助判断数据质量
- ❌ 避免: 在 Function 节点中进行耗时超过 1 秒的操作
- ❌ 避免: 直接信任传感器数据而不做任何验证
- ❌ 避免: 聚合数据丢失原始数据的时间戳信息
Summary
Section titled “Summary”本节要点总结:
- 数据清洗:验证数据范围、检测异常跳变、过滤无效数据
- 格式转换:数据类型转换、单位换算、InfluxDB Line Protocol
- 数据聚合:使用 Join 节点实现时间窗口聚合
- 数据增强:添加衍生指标(THI、露点、蒸气压)
- 质量标记:每个数据处理阶段添加质量信息便于排查问题