功耗数据采集
功耗数据采集
本节介绍如何通过 MQTT 采集智能继电器/插座的能耗数据,并将数据存储到 InfluxDB。学习完成后,您将能够:
- 通过 MQTT 订阅能耗数据
- 使用 Node-RED 解析和格式化能耗数据
- 将能耗数据存储到 InfluxDB
- 设计完整的数据采集管道
Data Flow Architecture
Section titled “Data Flow Architecture”┌─────────────────────────────────────────────────────────────┐│ 数据采集管道 │├─────────────────────────────────────────────────────────────┤│ ││ [Shelly/Tasmota] ││ │ ││ │ MQTT (tele/+/+/SENSOR 或 shelly/+/status) ││ ▼ ││ [Mosquitto Broker] ││ │ ││ ▼ ││ [Node-RED MQTT In] → [Function: 格式化] → [InfluxDB Out] ││ ││ ▼ ││ [InfluxDB] → [Grafana Dashboard] ││ │└─────────────────────────────────────────────────────────────┘MQTT Subscription
Section titled “MQTT Subscription”订阅 Tasmota 能耗数据
Section titled “订阅 Tasmota 能耗数据”// MQTT In 节点配置{ "name": "Tasmota Energy", "topic": "tele/+/+/SENSOR", "qos": 0, "broker": "local-broker", "output": "a parsed JSON object"}订阅 Shelly 能耗数据
Section titled “订阅 Shelly 能耗数据”// MQTT In 节点配置 (Gen1){ "name": "Shelly Energy", "topic": "shelly+/+/status", "qos": 0, "broker": "local-broker"}
// MQTT In 节点配置 (Gen2){ "name": "Shelly Plus Energy", "topic": "shellyplus+/+/status/switch:0", "qos": 0, "broker": "local-broker"}Data Parsing Functions
Section titled “Data Parsing Functions”Function: 解析 Tasmota 能耗数据
Section titled “Function: 解析 Tasmota 能耗数据”// 输入: msg.payload 来自 MQTT In// 原始数据: {"Time":"...","ENERGY":{"Power":42.5,"Voltage":223.1,"Current":0.216,"Total":1.452,"Today":0.629}}
var energy = msg.payload.ENERGY;if (!energy) { return null; // 非能耗数据,丢弃}
// 提取设备 ID 从 Topic// tele/tasmota/socket1/SENSORvar topicParts = msg.topic.split('/');var deviceId = topicParts[2] || "unknown";
// 格式化为 InfluxDB Pointmsg.payload = { measurement: "energy_consumption", tags: { device: deviceId, source: "tasmota" }, fields: { power: Number(energy.Power), voltage: Number(energy.Voltage), current: Number(energy.Current), total_kwh: Number(energy.Total), today_kwh: Number(energy.Today), period_kwh: Number(energy.Period), apparent_power: Number(energy.ApparentPower || 0), power_factor: Number(energy.Factor || 0) }};
return msg;Function: 解析 Shelly Gen2 能耗数据
Section titled “Function: 解析 Shelly Gen2 能耗数据”// 输入: msg.payload 来自 MQTT In (Shelly Gen2)// 原始数据: {"id":0,"output":true,"apower":42.5,"voltage":223.1,"current":0.216,"aenergy":{"total":1452,"by_minute":[12.3,15.1]}}
var data = msg.payload;if (!data || data.apower === undefined) { return null;}
// 提取设备 ID// shellyplus1pm-ABC123/status/switch:0var deviceId = msg.topic.split('/')[0] || "shelly-unknown";
msg.payload = { measurement: "energy_consumption", tags: { device: deviceId, source: "shelly-gen2", relay_id: String(data.id || 0) }, fields: { power: Number(data.apower), voltage: Number(data.voltage || 0), current: Number(data.current || 0), total_kwh: Number((data.aenergy && data.aenergy.total) ? data.aenergy.total / 1000 : 0), temperature: Number((data.temperature && data.temperature.tC) || 0) }};
return msg;InfluxDB Storage Configuration
Section titled “InfluxDB Storage Configuration”// InfluxDB Out 节点配置{ "name": "Store Energy Data", "influxdb": "influxdb-config", "organization": "iot-demo", "bucket": "nodered", "measurement": "energy_consumption", "precision": "ms", "tags": "device,source,relay_id", "fields": "power,voltage,current,total_kwh,today_kwh"}Complete Data Flow
Section titled “Complete Data Flow”[Inject: 每 10 秒] (用于模拟) │ ▼[Function: Mock Data] │ ▼[MQTT Out: tele/tasmota/socket1/SENSOR] (模拟 Tasmota) │ ▼[Mosquitto Broker] │ ▼[MQTT In: tele/+/+/SENSOR] │ ▼[Function: 解析能耗数据] │ ├──→ [InfluxDB Out] ──→ InfluxDB │ └──→ [Debug: 监控]# 1. 检查 MQTT 数据流mosquitto_sub -h localhost -t "tele/+/+/SENSOR" -v
# 2. 检查 InfluxDB 数据docker exec influxdb influx query \ 'from(bucket: "nodered") |> range(start: -15m) |> filter(fn: (r) => r._measurement == "energy_consumption") |> filter(fn: (r) => r._field == "power")'预期结果:
- MQTT 每 TelePeriod 秒收到一条 SENSOR 消息
- InfluxDB 中有 energy_consumption 时序数据
- Grafana 中可以看到实时更新的能耗图表
问题: 数据未写入 InfluxDB
Section titled “问题: 数据未写入 InfluxDB”# 1. 检查 MQTT In 节点是否收到数据# Node-RED Debug 面板查看 msg.payload
# 2. 检查 Function 节点是否返回了正确的 Point 格式# 添加 node.warn() 调试node.warn("Formatted payload: " + JSON.stringify(msg.payload));
# 3. 检查 InfluxDB Out 节点配置# Token、Organization、Bucket 是否正确?Common Customer Questions
Section titled “Common Customer Questions”Q1: 能耗数据采集频率多高合适?
Section titled “Q1: 能耗数据采集频率多高合适?”| 场景 | 推荐频率 | 原因 |
|---|---|---|
| POC 演示 | 10-30 秒 | 展示实时效果 |
| 生产环境 | 1-5 分钟 | 减少数据库负载 |
| 高精度监测 | 10 秒 | 捕捉瞬态变化 |
| 能效分析 | 5-15 分钟 | 长期趋势分析 |
Q2: 如何处理多台设备的能耗数据?
Section titled “Q2: 如何处理多台设备的能耗数据?”使用 Tags 区分设备。在 InfluxDB 中为每个设备分配唯一的 device tag,Grafana 中通过变量实现设备筛选。
✅ 推荐做法:
- 使用 Tags 区分设备来源(device、location、source)
- 数据写入前验证字段类型(Number() 转换)
- 设置合理的 TelePeriod 和采集频率
- 添加数据有效性检查(过滤异常值)
❌ 避免做法:
- 高频写入导致 InfluxDB 负载过大
- 不验证数据类型直接写入
- Tags 包含时间戳等高基数数据
- 忽略数据单元转换(W vs kW, Wh vs kWh)
Summary
Section titled “Summary”- MQTT 订阅能耗数据的 Topic 结构(tele/+/+/SENSOR)
- Function 节点将原始数据格式化为 InfluxDB Point
- Tags 用于区分设备来源,Fields 存储测量值
- InfluxDB 提供高效的时序数据存储
- 完整的采集管道:设备 → MQTT → Node-RED → InfluxDB