跳转到内容

功耗数据采集

功耗数据采集

本节介绍如何通过 MQTT 采集智能继电器/插座的能耗数据,并将数据存储到 InfluxDB。学习完成后,您将能够:

  • 通过 MQTT 订阅能耗数据
  • 使用 Node-RED 解析和格式化能耗数据
  • 将能耗数据存储到 InfluxDB
  • 设计完整的数据采集管道
┌─────────────────────────────────────────────────────────────┐
│ 数据采集管道 │
├─────────────────────────────────────────────────────────────┤
│ │
│ [Shelly/Tasmota] │
│ │ │
│ │ MQTT (tele/+/+/SENSOR 或 shelly/+/status) │
│ ▼ │
│ [Mosquitto Broker] │
│ │ │
│ ▼ │
│ [Node-RED MQTT In] → [Function: 格式化] → [InfluxDB Out] │
│ │
│ ▼ │
│ [InfluxDB] → [Grafana Dashboard] │
│ │
└─────────────────────────────────────────────────────────────┘
// MQTT In 节点配置
{
"name": "Tasmota Energy",
"topic": "tele/+/+/SENSOR",
"qos": 0,
"broker": "local-broker",
"output": "a parsed JSON object"
}
// MQTT In 节点配置 (Gen1)
{
"name": "Shelly Energy",
"topic": "shelly+/+/status",
"qos": 0,
"broker": "local-broker"
}
// MQTT In 节点配置 (Gen2)
{
"name": "Shelly Plus Energy",
"topic": "shellyplus+/+/status/switch:0",
"qos": 0,
"broker": "local-broker"
}
// 输入: msg.payload 来自 MQTT In
// 原始数据: {"Time":"...","ENERGY":{"Power":42.5,"Voltage":223.1,"Current":0.216,"Total":1.452,"Today":0.629}}
var energy = msg.payload.ENERGY;
if (!energy) {
return null; // 非能耗数据,丢弃
}
// 提取设备 ID 从 Topic
// tele/tasmota/socket1/SENSOR
var topicParts = msg.topic.split('/');
var deviceId = topicParts[2] || "unknown";
// 格式化为 InfluxDB Point
msg.payload = {
measurement: "energy_consumption",
tags: {
device: deviceId,
source: "tasmota"
},
fields: {
power: Number(energy.Power),
voltage: Number(energy.Voltage),
current: Number(energy.Current),
total_kwh: Number(energy.Total),
today_kwh: Number(energy.Today),
period_kwh: Number(energy.Period),
apparent_power: Number(energy.ApparentPower || 0),
power_factor: Number(energy.Factor || 0)
}
};
return msg;
// 输入: msg.payload 来自 MQTT In (Shelly Gen2)
// 原始数据: {"id":0,"output":true,"apower":42.5,"voltage":223.1,"current":0.216,"aenergy":{"total":1452,"by_minute":[12.3,15.1]}}
var data = msg.payload;
if (!data || data.apower === undefined) {
return null;
}
// 提取设备 ID
// shellyplus1pm-ABC123/status/switch:0
var deviceId = msg.topic.split('/')[0] || "shelly-unknown";
msg.payload = {
measurement: "energy_consumption",
tags: {
device: deviceId,
source: "shelly-gen2",
relay_id: String(data.id || 0)
},
fields: {
power: Number(data.apower),
voltage: Number(data.voltage || 0),
current: Number(data.current || 0),
total_kwh: Number((data.aenergy && data.aenergy.total) ? data.aenergy.total / 1000 : 0),
temperature: Number((data.temperature && data.temperature.tC) || 0)
}
};
return msg;
// InfluxDB Out 节点配置
{
"name": "Store Energy Data",
"influxdb": "influxdb-config",
"organization": "iot-demo",
"bucket": "nodered",
"measurement": "energy_consumption",
"precision": "ms",
"tags": "device,source,relay_id",
"fields": "power,voltage,current,total_kwh,today_kwh"
}
[Inject: 每 10 秒] (用于模拟)
[Function: Mock Data]
[MQTT Out: tele/tasmota/socket1/SENSOR] (模拟 Tasmota)
[Mosquitto Broker]
[MQTT In: tele/+/+/SENSOR]
[Function: 解析能耗数据]
├──→ [InfluxDB Out] ──→ InfluxDB
└──→ [Debug: 监控]
Terminal window
# 1. 检查 MQTT 数据流
mosquitto_sub -h localhost -t "tele/+/+/SENSOR" -v
# 2. 检查 InfluxDB 数据
docker exec influxdb influx query \
'from(bucket: "nodered")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "energy_consumption")
|> filter(fn: (r) => r._field == "power")'

预期结果:

  • MQTT 每 TelePeriod 秒收到一条 SENSOR 消息
  • InfluxDB 中有 energy_consumption 时序数据
  • Grafana 中可以看到实时更新的能耗图表
Terminal window
# 1. 检查 MQTT In 节点是否收到数据
# Node-RED Debug 面板查看 msg.payload
# 2. 检查 Function 节点是否返回了正确的 Point 格式
# 添加 node.warn() 调试
node.warn("Formatted payload: " + JSON.stringify(msg.payload));
# 3. 检查 InfluxDB Out 节点配置
# Token、Organization、Bucket 是否正确?

Q1: 能耗数据采集频率多高合适?

Section titled “Q1: 能耗数据采集频率多高合适?”
场景推荐频率原因
POC 演示10-30 秒展示实时效果
生产环境1-5 分钟减少数据库负载
高精度监测10 秒捕捉瞬态变化
能效分析5-15 分钟长期趋势分析

Q2: 如何处理多台设备的能耗数据?

Section titled “Q2: 如何处理多台设备的能耗数据?”

使用 Tags 区分设备。在 InfluxDB 中为每个设备分配唯一的 device tag,Grafana 中通过变量实现设备筛选。

推荐做法:

  • 使用 Tags 区分设备来源(device、location、source)
  • 数据写入前验证字段类型(Number() 转换)
  • 设置合理的 TelePeriod 和采集频率
  • 添加数据有效性检查(过滤异常值)

避免做法:

  • 高频写入导致 InfluxDB 负载过大
  • 不验证数据类型直接写入
  • Tags 包含时间戳等高基数数据
  • 忽略数据单元转换(W vs kW, Wh vs kWh)
  1. MQTT 订阅能耗数据的 Topic 结构(tele/+/+/SENSOR)
  2. Function 节点将原始数据格式化为 InfluxDB Point
  3. Tags 用于区分设备来源,Fields 存储测量值
  4. InfluxDB 提供高效的时序数据存储
  5. 完整的采集管道:设备 → MQTT → Node-RED → InfluxDB