InfluxDB 集成
InfluxDB 集成
本节介绍如何在 Node-RED 中集成 InfluxDB 时序数据库。学习完成后,您将能够:
- 安装和配置 InfluxDB 节点
- 将传感器数据写入 InfluxDB
- 从 InfluxDB 查询历史数据
- 构建完整的数据采集 Flow
Installing InfluxDB Nodes
Section titled “Installing InfluxDB Nodes”# 通过 Node-RED 面板安装# 菜单 → Manage Palette → Install → "node-red-contrib-influxdb"
# 或通过命令行安装docker exec -it nodered shcd /datanpm install node-red-contrib-influxdbexitdocker restart nodered
# 验证安装# 左侧面板会出现 influxdb in/out 节点Node Configuration
Section titled “Node Configuration”InfluxDB Out Node
Section titled “InfluxDB Out Node”{ "name": "Store Temperature", "influxdb": "influxdb-config", "organization": "iot-demo", "bucket": "nodered", "measurement": "sensor_data", "precision": "ms", "tags": "device", "fields": "temperature,humidity"}InfluxDB In Node
Section titled “InfluxDB In Node”{ "name": "Query Sensor Data", "influxdb": "influxdb-config", "organization": "iot-demo", "bucket": "nodered", "raw": false}Data Writing Patterns
Section titled “Data Writing Patterns”Pattern 1: 单值字段
Section titled “Pattern 1: 单值字段”// 简单温度数据写入// Input: msg.payload = 25.3
msg.payload = { measurement: "temperature", tags: { device: "SENSOR-01", location: "factory-a" }, fields: { value: Number(msg.payload) }};
return msg;Pattern 2: 多值字段
Section titled “Pattern 2: 多值字段”// 写入多个字段// Input: msg.payload = {temp: 25.3, hum: 68.2, press: 1013}
var data = msg.payload;
msg.payload = { measurement: "environment", tags: { device: data.device || "unknown", type: data.type || "sensor" }, fields: { temperature: Number(data.temp), humidity: Number(data.hum), pressure: Number(data.press || 0) }, timestamp: Date.now() * 1000000 // 纳秒};
return msg;Pattern 3: 从 MQTT 直接写入
Section titled “Pattern 3: 从 MQTT 直接写入”完整的 MQTT → InfluxDB 数据流:
[MQTT In: sensor/data] → [JSON: 解析] → [Function: 格式化] → [InfluxDB Out]
ESP32 发送到 MQTT:topic: "sensor/SENSOR-01"payload: '{"temp":25.3,"hum":68.2}'// Function: 格式化为 InfluxDB 点格式try { var data = JSON.parse(msg.payload);
msg.payload = { measurement: "sensor_data", tags: { device: msg.topic.split('/')[1] || "unknown", location: data.location || "unknown" }, fields: { temperature: Number(data.temp), humidity: Number(data.hum), battery: Number(data.battery || 100) } };
return msg;} catch (e) { node.error("Format error: " + e.message); return null;}Data Querying
Section titled “Data Querying”使用 InfluxDB In 节点查询
Section titled “使用 InfluxDB In 节点查询”// 查询最近 1 小时的数据// InfluxDB In 节点配置:// Query: from(bucket: "nodered")// |> range(start: -1h)// |> filter(fn: (r) => r._measurement == "sensor_data")// |> filter(fn: (r) => r.device == "SENSOR-01")
// 结果: msg.payload = 数组格式// [{_time, _value, device, ...}, ...]// 使用 Inject 定期查询最新数据// Inject (每 30 秒) → InfluxDB In → Function → Debug
// Function: 处理查询结果var records = msg.payload;
if (records && records.length > 0) { var latest = records[records.length - 1];
msg.payload = { current_temperature: latest._value, timestamp: latest._time, total_records: records.length };} else { msg.payload = { message: "No data found" };}
return msg;Complete Integration Flow
Section titled “Complete Integration Flow”数据采集 Flow
Section titled “数据采集 Flow”[ESP32] ──MQTT──→ [Mosquitto] │ ▼ [MQTT In: sensor/#] │ ▼ [JSON: 字符串转对象] │ ▼ [Function: 格式化写入] │ ┌────────────┼────────────┐ ▼ ▼ ▼ [InfluxDB Out] [Debug: 监控] [MQTT Out: 确认]批量写入优化
Section titled “批量写入优化”// 使用 Join 节点收集一批数据后批量写入// 提高写入性能
// Join 节点: 收集 10 条消息// Function: 批量格式化
var messages = msg.payload;var points = [];
messages.forEach(function(msg) { points.push({ measurement: "sensor_data", tags: { device: msg.topic }, fields: { value: Number(msg.payload) } });});
msg.payload = points;return msg;Node-RED InfluxDB Settings
Section titled “Node-RED InfluxDB Settings”# Docker Compose 中的配置services: influxdb: image: influxdb:2 container_name: influxdb ports: - "8086:8086" volumes: - ./influxdb/data:/var/lib/influxdb2 environment: - DOCKER_INFLUXDB_INIT_MODE=setup - DOCKER_INFLUXDB_INIT_USERNAME=admin - DOCKER_INFLUXDB_INIT_PASSWORD=password - DOCKER_INFLUXDB_INIT_ORG=iot-demo - DOCKER_INFLUXDB_INIT_BUCKET=nodered - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=your-token# 检查 InfluxDB 状态curl http://localhost:8086/health
# 检查 Tokendocker exec influxdb influx auth list
# Node-RED 配置检查# Host: influxdb (容器名) 或 localhost# Port: 8086# Token: your-token# Organization: iot-demo# Bucket: nodered// 在 Function 节点中添加写入验证var result = msg.payload;if (result && result.error) { node.error("Write error: " + result.error); return null;}return msg;✅ 推荐做法:
- 使用标签 (Tags) 标记设备、位置等维度
- 使用字段 (Fields) 存储测量值
- 合理设计 Measurement 名称(按数据类型)
- 设置保留策略控制数据存储量
- 使用批量写入提高性能
❌ 避免做法:
- Tags 包含高基数数据(如时间戳)
- 字段名使用特殊字符
- 写入频率超过 InfluxDB 处理能力
- 忽略数据验证直接写入
- 不使用 Token 直接写入
Summary
Section titled “Summary”- InfluxDB 节点提供开箱即用的时序数据写入
- Point 格式包含 Measurement/Tags/Fields/Timestamp
- 支持批量写入提高性能
- Flux 查询语言支持复杂的时序分析
- 与 MQTT 结合实现完整的数据采集管道