跳转到内容

数据处理与转换

数据处理与转换

本节介绍如何在 Node-RED 中对传感器数据进行处理和转换,包括数据类型转换、单位换算、数据清洗和聚合。学习完成后,您将能够:

  • 使用 Function 节点实现数据清洗和验证
  • 实现不同数据格式之间的转换
  • 配置数据聚合和降采样规则
  • 处理传感器数据中的异常值和缺失值

在开始本节之前,请确保:

  • Node-RED Flow 已基本搭建完成
  • 了解 Function 节点的 JavaScript 编程
  • 理解 InfluxDB 的 Line Protocol 数据格式
原始数据 → 数据清洗 → 格式转换 → 单位换算 → 聚合计算 → 存储/输出

每个阶段的功能:

阶段功能说明
数据清洗验证数据有效性过滤 NaN、异常值、无效数据
格式转换调整数据结构从 MQTT JSON 转换为 InfluxDB Line Protocol
单位换算标准化度量单位温度 °C ↔ °F,光照 lux 级别映射
聚合计算统计汇总均值、最大值、最小值、采样计数
数据增强补充计算字段体感温度、温湿度指数、热指数
// 数据验证 Function 节点
// 输入: msg.payload 包含传感器数据
// 输出: 验证通过的数据,或丢弃无效数据
var data = msg.payload;
// 验证数据是否完整
if (!data.temperature || !data.humidity) {
node.warn("Missing sensor data, discarding message");
return null; // 丢弃无效数据
}
// 验证温度范围
var temp = data.temperature.value;
if (typeof temp !== 'number' || temp < -20 || temp > 60) {
node.warn("Temperature out of range: " + temp);
return null;
}
// 验证湿度范围
var hum = data.humidity.value;
if (typeof hum !== 'number' || hum < 0 || hum > 100) {
node.warn("Humidity out of range: " + hum);
return null;
}
// 验证光照范围
var lux = data.lux ? data.lux.value : null;
if (lux !== null && (typeof lux !== 'number' || lux < 0 || lux > 100000)) {
node.warn("Light value out of range: " + lux);
lux = null; // 标记异常光线值
data.lux.value = null;
}
// 添加数据质量标记
var signalStrength = (data.quality && data.quality.signal_rssi)
? data.quality.signal_rssi : -90;
data.data_quality = {
rssi: signalStrength,
valid: true,
checked_at: Date.now()
};
// 数据来源信息
msg.source = "factory_environment_sensor";
node.log("Validated data from: " + data.device_id);
return msg;
// 异常值过滤器
// 使用滑动窗口检测异常跳变
// 初始化上下文存储(仅在首次运行)
var context = flow.get("sensor_history") || {
lastTemp: null,
lastHum: null,
maxDeltaTemp: 5, // 最大允许温度变化 ±5°C
maxDeltaHum: 10 // 最大允许湿度变化 ±10%
};
var data = msg.payload;
var currentTemp = data.temperature.value;
var currentHum = data.humidity.value;
// 检查温度跳变
if (context.lastTemp !== null) {
var tempDelta = Math.abs(currentTemp - context.lastTemp);
if (tempDelta > context.maxDeltaTemp) {
node.warn("Temperature spike detected: " +
context.lastTemp + "" + currentTemp + "°C");
// 使用前一个值替换(简单处理)
// 更精确的做法可能是丢弃或用移动平均替换
data.temperature.value = context.lastTemp;
data.filtered = true;
}
}
// 检查湿度跳变
if (context.lastHum !== null) {
var humDelta = Math.abs(currentHum - context.lastHum);
if (humDelta > context.maxDeltaHum) {
node.warn("Humidity spike detected: " +
context.lastHum + "" + currentHum + "%");
data.humidity.value = context.lastHum;
data.filtered = true;
}
}
// 更新历史数据
context.lastTemp = currentTemp;
context.lastHum = currentHum;
flow.set("sensor_history", context);
return msg;
// 温度单位换算和增强
var data = msg.payload;
var tempCelsius = data.temperature.value;
// 转换为华氏度
var tempFahrenheit = (tempCelsius * 9/5) + 32;
// 转换为开尔文
var tempKelvin = tempCelsius + 273.15;
// 添加转换后的值
data.temperature.fahrenheit = Math.round(tempFahrenheit * 10) / 10;
data.temperature.kelvin = Math.round(tempKelvin * 10) / 10;
// 添加温度水平描述
if (tempCelsius < 10) {
data.temperature.level = "cold";
} else if (tempCelsius < 20) {
data.temperature.level = "cool";
} else if (tempCelsius < 28) {
data.temperature.level = "comfortable";
} else if (tempCelsius < 35) {
data.temperature.level = "warm";
} else {
data.temperature.level = "hot";
}
return msg;
// 光照级别分类
var data = msg.payload;
var lux = data.lux ? data.lux.value : 0;
// 光照级别分类
var lightLevel;
if (lux < 1) lightLevel = "dark";
else if (lux < 50) lightLevel = "dim";
else if (lux < 200) lightLevel = "low";
else if (lux < 500) lightLevel = "normal";
else if (lux < 1000) lightLevel = "bright";
else if (lux < 10000) lightLevel = "very_bright";
else lightLevel = "direct_sunlight";
// 添加光照级别
data.lux.level = lightLevel;
data.lux.category = lux < 200 ? "insufficient" : "adequate";
// 工作区光照合规检查
data.light_compliance = {
standard: "ISO 8995",
minimum: 300,
current: lux,
compliant: lux >= 300
};
return msg;

Node-RED 的 Join 节点可以对一段时间内的数据进行聚合:

Join 节点配置:
- Mode: Time-based
- Time: 60 seconds (每 60 秒聚合一次)
- Count: unlimited
- Timeout: 60 seconds
- Output:
- msg.payload = array of accumulated messages
// 接在 Join 节点后的聚合处理
// 输入: msg.payload = [{传感器数据1}, {传感器数据2}, ...]
var samples = msg.payload;
var count = samples.length;
if (count === 0) return null;
// 聚合计算
var sumTemp = 0, sumHum = 0, sumLux = 0;
var minTemp = Infinity, maxTemp = -Infinity;
var validSamples = 0;
samples.forEach(function(sample) {
if (sample.temperature && sample.humidity) {
sumTemp += sample.temperature.value;
sumHum += sample.humidity.value;
if (sample.lux) sumLux += sample.lux.value;
minTemp = Math.min(minTemp, sample.temperature.value);
maxTemp = Math.max(maxTemp, sample.temperature.value);
validSamples++;
}
});
// 构建聚合结果
msg.payload = {
device_id: samples[0].device_id,
location: samples[0].location,
period: {
start: samples[0].processed_at || Date.now() - 60000,
end: Date.now(),
sample_count: count,
valid_count: validSamples
},
temperature: {
avg: Math.round(sumTemp / validSamples * 10) / 10,
min: Math.round(minTemp * 10) / 10,
max: Math.round(maxTemp * 10) / 10,
unit: "celsius"
},
humidity: {
avg: Math.round(sumHum / validSamples * 10) / 10,
unit: "percent"
},
lux: {
avg: Math.round(sumLux / validSamples),
unit: "lux"
}
};
return msg;

Function: Convert to InfluxDB Line Protocol

Section titled “Function: Convert to InfluxDB Line Protocol”
// 转换为 InfluxDB 2.x Line Protocol 格式
// 配合 InfluxDB Out 节点使用
var data = msg.payload;
// 构建 InfluxDB 数据点
var influxPoint = {
measurement: "environment",
tags: {
device_id: data.device_id || "unknown",
location: data.location || "unknown",
zone: "zone1",
sensor_type: "environmental"
},
fields: {
temperature: data.temperature ? data.temperature.value : 0,
humidity: data.humidity ? data.humidity.value : 0,
lux: data.lux ? data.lux.value : 0,
feels_like: data.feels_like ? data.feels_like.value : 0,
signal_rssi: data.quality ? data.quality.signal_rssi : -100
},
timestamp: Date.now() * 1000000 // InfluxDB 使用纳秒时间戳
};
// 可以直接使用 InfluxDB Out 节点,它会自动转换此格式
// 或手动构建 Line Protocol 字符串
var lineProtocol = influxPoint.measurement +
"," +
Object.entries(influxPoint.tags).map(function(tag) {
return tag[0] + "=" + tag[1];
}).join(",") +
" " +
Object.entries(influxPoint.fields).map(function(field) {
return field[0] + "=" + field[1];
}).join(",") +
" " +
influxPoint.timestamp;
msg.payload = influxPoint;
return msg;
// 数据增强:添加衍生指标
var data = msg.payload;
// 计算温湿度指数 (THI) - 舒适度指标
// THI = T - 0.55 × (1 - RH/100) × (T - 14.5)
// 适用于温度超过 14.5°C 的场景
if (data.temperature && data.humidity) {
var t = data.temperature.value;
var h = data.humidity.value;
var thi = t - 0.55 * (1 - h/100) * (t - 14.5);
data.derived = {
thi: {
value: Math.round(thi * 100) / 100,
label: thi < 70 ? "comfortable" :
thi < 75 ? "slightly_uncomfortable" :
thi < 80 ? "moderately_uncomfortable" :
"extremely_uncomfortable"
},
dew_point: {
value: Math.round((t - (100 - h) / 5) * 10) / 10,
unit: "celsius"
},
vapor_pressure: {
value: Math.round(6.112 * Math.pow(Math.E, (17.67 * t) / (t + 243.5)) * (h/100) * 10) / 10,
unit: "hPa"
}
};
}
return msg;
1. 在 Function 节点后添加 Debug 节点
2. 检查过滤后的数据:
- 无效温度 (< -20°C 或 > 60°C) 是否被过滤
- 异常跳变是否被检测和替换
- 数据质量标记是否正确添加
1. 使用 Inject 节点发送模拟数据
2. 验证转换结果:
- 温度值单位换算是否正确
- 光照级别分类是否符合预期
- InfluxDB Line Protocol 格式是否正确

症状:Join 节点在一段时间后输出为 null

可能原因

  • 时间窗口超出传感器发送间隔
  • 传感器停止发送数据

解决方案

  1. 设置合理的超时值(大于传感器发送间隔)
  2. 如果传感器超时未发送数据,发送默认值

症状:InfluxDB 写入失败,提示字段类型不匹配

可能原因

  • 字符串类型错误地写入了数值字段
  • JSON 解析后数据类型不符合预期

解决方案

  1. 使用 Number() 强制转换数值类型
  2. 使用 typeof 检查数据类型后再处理
  • 推荐: 在入口处进行数据验证,尽早过滤无效数据
  • 推荐: 使用 Flow 上下文存储历史数据用于异常检测
  • 推荐: 聚合时包含采样计数,帮助判断数据质量
  • 避免: 在 Function 节点中进行耗时超过 1 秒的操作
  • 避免: 直接信任传感器数据而不做任何验证
  • 避免: 聚合数据丢失原始数据的时间戳信息

本节要点总结:

  1. 数据清洗:验证数据范围、检测异常跳变、过滤无效数据
  2. 格式转换:数据类型转换、单位换算、InfluxDB Line Protocol
  3. 数据聚合:使用 Join 节点实现时间窗口聚合
  4. 数据增强:添加衍生指标(THI、露点、蒸气压)
  5. 质量标记:每个数据处理阶段添加质量信息便于排查问题