跳转到内容

InfluxDB 集成

InfluxDB 集成

本节介绍如何在 Node-RED 中集成 InfluxDB 时序数据库。学习完成后,您将能够:

  • 安装和配置 InfluxDB 节点
  • 将传感器数据写入 InfluxDB
  • 从 InfluxDB 查询历史数据
  • 构建完整的数据采集 Flow
Terminal window
# 通过 Node-RED 面板安装
# 菜单 → Manage Palette → Install → "node-red-contrib-influxdb"
# 或通过命令行安装
docker exec -it nodered sh
cd /data
npm install node-red-contrib-influxdb
exit
docker restart nodered
# 验证安装
# 左侧面板会出现 influxdb in/out 节点
{
"name": "Store Temperature",
"influxdb": "influxdb-config",
"organization": "iot-demo",
"bucket": "nodered",
"measurement": "sensor_data",
"precision": "ms",
"tags": "device",
"fields": "temperature,humidity"
}
{
"name": "Query Sensor Data",
"influxdb": "influxdb-config",
"organization": "iot-demo",
"bucket": "nodered",
"raw": false
}
// 简单温度数据写入
// Input: msg.payload = 25.3
msg.payload = {
measurement: "temperature",
tags: {
device: "SENSOR-01",
location: "factory-a"
},
fields: {
value: Number(msg.payload)
}
};
return msg;
// 写入多个字段
// Input: msg.payload = {temp: 25.3, hum: 68.2, press: 1013}
var data = msg.payload;
msg.payload = {
measurement: "environment",
tags: {
device: data.device || "unknown",
type: data.type || "sensor"
},
fields: {
temperature: Number(data.temp),
humidity: Number(data.hum),
pressure: Number(data.press || 0)
},
timestamp: Date.now() * 1000000 // 纳秒
};
return msg;

完整的 MQTT → InfluxDB 数据流:

[MQTT In: sensor/data] → [JSON: 解析] → [Function: 格式化] → [InfluxDB Out]
ESP32 发送到 MQTT:
topic: "sensor/SENSOR-01"
payload: '{"temp":25.3,"hum":68.2}'
// Function: 格式化为 InfluxDB 点格式
try {
var data = JSON.parse(msg.payload);
msg.payload = {
measurement: "sensor_data",
tags: {
device: msg.topic.split('/')[1] || "unknown",
location: data.location || "unknown"
},
fields: {
temperature: Number(data.temp),
humidity: Number(data.hum),
battery: Number(data.battery || 100)
}
};
return msg;
} catch (e) {
node.error("Format error: " + e.message);
return null;
}
// 查询最近 1 小时的数据
// InfluxDB In 节点配置:
// Query: from(bucket: "nodered")
// |> range(start: -1h)
// |> filter(fn: (r) => r._measurement == "sensor_data")
// |> filter(fn: (r) => r.device == "SENSOR-01")
// 结果: msg.payload = 数组格式
// [{_time, _value, device, ...}, ...]
// 使用 Inject 定期查询最新数据
// Inject (每 30 秒) → InfluxDB In → Function → Debug
// Function: 处理查询结果
var records = msg.payload;
if (records && records.length > 0) {
var latest = records[records.length - 1];
msg.payload = {
current_temperature: latest._value,
timestamp: latest._time,
total_records: records.length
};
} else {
msg.payload = { message: "No data found" };
}
return msg;
[ESP32] ──MQTT──→ [Mosquitto]
[MQTT In: sensor/#]
[JSON: 字符串转对象]
[Function: 格式化写入]
┌────────────┼────────────┐
▼ ▼ ▼
[InfluxDB Out] [Debug: 监控] [MQTT Out: 确认]
// 使用 Join 节点收集一批数据后批量写入
// 提高写入性能
// Join 节点: 收集 10 条消息
// Function: 批量格式化
var messages = msg.payload;
var points = [];
messages.forEach(function(msg) {
points.push({
measurement: "sensor_data",
tags: { device: msg.topic },
fields: { value: Number(msg.payload) }
});
});
msg.payload = points;
return msg;
# Docker Compose 中的配置
services:
influxdb:
image: influxdb:2
container_name: influxdb
ports:
- "8086:8086"
volumes:
- ./influxdb/data:/var/lib/influxdb2
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=password
- DOCKER_INFLUXDB_INIT_ORG=iot-demo
- DOCKER_INFLUXDB_INIT_BUCKET=nodered
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=your-token
Terminal window
# 检查 InfluxDB 状态
curl http://localhost:8086/health
# 检查 Token
docker exec influxdb influx auth list
# Node-RED 配置检查
# Host: influxdb (容器名) 或 localhost
# Port: 8086
# Token: your-token
# Organization: iot-demo
# Bucket: nodered
// 在 Function 节点中添加写入验证
var result = msg.payload;
if (result && result.error) {
node.error("Write error: " + result.error);
return null;
}
return msg;

推荐做法:

  • 使用标签 (Tags) 标记设备、位置等维度
  • 使用字段 (Fields) 存储测量值
  • 合理设计 Measurement 名称(按数据类型)
  • 设置保留策略控制数据存储量
  • 使用批量写入提高性能

避免做法:

  • Tags 包含高基数数据(如时间戳)
  • 字段名使用特殊字符
  • 写入频率超过 InfluxDB 处理能力
  • 忽略数据验证直接写入
  • 不使用 Token 直接写入
  1. InfluxDB 节点提供开箱即用的时序数据写入
  2. Point 格式包含 Measurement/Tags/Fields/Timestamp
  3. 支持批量写入提高性能
  4. Flux 查询语言支持复杂的时序分析
  5. 与 MQTT 结合实现完整的数据采集管道