跳转到内容

InfluxDB 数据存储

InfluxDB 数据存储

本节介绍如何将处理后的环境数据存储到 InfluxDB 时序数据库中。学习完成后,您将能够:

  • 理解 InfluxDB 的核心概念(Measurement、Tag、Field、Bucket)
  • 在 Node-RED 中配置 InfluxDB 输出节点
  • 设计高效的时序数据存储结构
  • 配置数据保留策略和查询优化

在开始本节之前,请确保:

  • InfluxDB 2.x 已安装并运行
  • Node-RED InfluxDB 节点已安装
  • 已完成数据处理 Flow
  • 了解 InfluxDB 的基本概念

InfluxDB 是一种专为时序数据优化的数据库,其数据模型包含以下核心概念:

┌─────────────────────────────────────────────────────┐
│ Measurement │
│ (类似 SQL 表名) │
├─────────────────────────────────────────────────────┤
│ Tags (标签) │ Fields (字段) │
│ - 索引字段 │ - 实际数值 │
│ - 用于过滤和分组 │ - 不建立索引 │
│ - 如 device_id │ - 如 temperature │
├─────────────────────────────────────────────────────┤
│ Timestamp (时间戳) │
│ - 默认纳秒精度 │
│ - 自动生成或指定 │
└─────────────────────────────────────────────────────┘
概念说明类比 SQL
Bucket数据存储桶,包含保留策略Database
Measurement测量项名Table
Tag带索引的元数据Indexed Column
Field实际数值Non-Indexed Column
Point单条数据记录Row
Series相同 measurement+tag 的集合Group
Retention Policy数据自动删除策略TTL
✅ GOOD Schema Design:
measurement="environment"
tags: device_id="ESP32_001", location="zone1"
fields: temperature=26.5, humidity=62.3, lux=450
timestamp: 1697123456000000000
❌ BAD Schema Design:
measurement="zone1_temperature"
tags: device="ESP32_001"
fields: value=26.5
timestamp: 1697123456000000000

原则:高基数数据作为 Tag,变化频繁的数值作为 Field

1. 打开 Node-RED → 右上角菜单 → Manage Palette
2. 选择 Install 标签
3. 搜索 "node-red-contrib-influxdb"
4. 安装最新版本
1. 从左侧节点面板拖入 "influxdb" 节点
2. 双击配置:
- Name: InfluxDB 2.x
- Version: 2.0
- URL: http://influxdb:8086
- Token: [从 InfluxDB UI 获取]
- Organization: organization
- Bucket: nodered
- Skip SSL Verification: true (仅开发环境)
3. 点击 Add 保存

获取 InfluxDB Token

Terminal window
# 通过 InfluxDB UI 获取
1. 访问 http://localhost:8086
2. 登录 Data API Tokens
3. 点击 Generate Custom API Token
4. 选择读写权限 生成 Token
1. 拖入 "influxdb out" 节点
2. 配置:
- Server: 选择刚配置的 InfluxDB 连接
- Measurement: environment
- Organization: organization
- Bucket: nodered
- Precision: milliseconds

最简单的存储方式——直接使用 InfluxDB Out 节点:

// 在 InfluxDB Out 节点前添加 Function 节点
// 构建 InfluxDB 兼容的数据格式
var data = msg.payload;
// 构建 InfluxDB 数据点对象
msg.payload = [{
measurement: "environment",
tags: {
device_id: data.device_id || "unknown",
location: data.location || "unknown",
sensor_type: "environmental"
},
fields: {
temperature: Number(data.temperature.value),
humidity: Number(data.humidity.value),
lux: Number(data.lux.value),
signal_rssi: data.quality ? Number(data.quality.signal_rssi) : -100
},
timestamp: Date.now()
}];
return msg;
// 使用 HTTP Request 节点直接写入 InfluxDB API
// HTTP Request 配置:
// URL: http://influxdb:8086/api/v2/write?org=organization&bucket=nodered&precision=ms
// Method: POST
// Headers:
// Authorization: Token YOUR_INFLUXDB_TOKEN
// Content-Type: text/plain
// Function 节点构建 Line Protocol 字符串
var data = msg.payload;
// Line Protocol 格式: measurement,tags fields timestamp
var line = "environment," +
"device_id=" + data.device_id + "," +
"location=" + data.location + "," +
"sensor_type=environmental " +
"temperature=" + Number(data.temperature.value) + "," +
"humidity=" + Number(data.humidity.value) + "," +
"lux=" + Number(data.lux.value) + " " +
Date.now();
msg.payload = line;
msg.headers = {
"Authorization": "Token YOUR_INFLUXDB_TOKEN",
"Content-Type": "text/plain"
};
return msg;
Measurement: environment
Tags:
device_id - 设备唯一标识(如 "ESP32_001")
location - 安装位置(如 "Factory Zone 1")
sensor_type - 传感器类型(如 "environmental")
firmware_ver - 固件版本(如 "2.1.0")
Fields:
temperature - 温度值 (float)
humidity - 湿度百分比 (float)
lux - 光照值 (float)
feels_like - 体感温度 (float)
signal_rssi - WiFi 信号强度 (integer)
Timestamp: Unix timestamp (毫秒精度)
设计要点推荐做法原因
Tag 选择选择基数低、查询频繁的字段Tag 有索引,查询更快
Field 选择数值型测量数据Field 无索引,适合存储
Measurement 命名小写、下划线分隔InfluxDB 命名规范
Tag 值固定值或有限枚举高基数 Tag 影响性能
时间戳精度毫秒或纳秒满足 IoT 数据的精度需求
Terminal window
# 通过 InfluxDB CLI 或 UI 配置数据保留策略
# 保留 30 天的原始数据
# 之后自动删除

InfluxDB UI 设置

1. 访问 http://localhost:8086
2. Load Data → Buckets
3. 选择 "nodered" bucket
4. 点击 "Update Retention"
5. 设置 "30d" (30 天)

对于长期数据存储,建议实施降采样:

// 持续查询:每小时聚合数据
// 在 InfluxDB 中创建 Task
// Function 节点:写入原始数据的同时写入聚合
msg.aggregate = {
measurement: "environment_1h",
tags: {
device_id: data.device_id,
location: data.location
},
fields: {
temp_avg: data.temperature.value,
temp_min: data.temperature.value,
temp_max: data.temperature.value,
sample_count: 1
},
timestamp: Math.floor(Date.now() / 3600000) * 3600000
};
// 查询最近 1 小时的环境数据
from(bucket: "nodered")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "environment")
|> filter(fn: (r) => r.location == "Factory Zone 1")
|> yield(name: "last_hour")
// 查询温度均值和最大值(按小时聚合)
from(bucket: "nodered")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "environment")
|> filter(fn: (r) => r._field == "temperature")
|> aggregateWindow(every: 1h, fn: mean)
|> yield(name: "hourly_avg")
Terminal window
# 查询最近 5 分钟的温度数据
curl -X POST http://localhost:8086/api/v2/query?org=organization \
-H "Authorization: Token YOUR_TOKEN" \
-H "Content-Type: application/vnd.flux" \
-d 'from(bucket: "nodered")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "environment")
|> filter(fn: (r) => r._field == "temperature")'
Terminal window
# 使用 InfluxDB UI 验证数据是否已写入
1. 访问 http://localhost:8086
2. Data Explorer
3. 选择 bucket: nodered
4. 选择 measurement: environment
5. 点击 Submit

预期结果:应看到每 3 秒有一个新的数据点写入,包含 temperature、humidity、lux 等字段。

// Node-RED Debug 节点后添加
// 检查 InfluxDB 输出节点状态
// 输出内容示例:
// msg.payload = [
// {
// "measurement": "environment",
// "tags": {"device_id": "ESP32_001", "location": "Factory Zone 1"},
// "fields": {"temperature": 26.5, "humidity": 62.3, "lux": 450},
// "timestamp": 1697123456000
// }
// ]

症状:Node-RED 中 InfluxDB 节点显示错误

可能原因

  • Token 无效或过期
  • Bucket 名称错误
  • 字段类型不一致

解决方案

  1. 在 InfluxDB UI 中重新生成 Token
  2. 确认 Bucket 名称完全匹配
  3. 检查所有 Field 值是否为数值类型(使用 Number() 转换)

症状:Grafana 仪表板加载缓慢

可能原因

  • 数据量过大没有降采样
  • 查询时间范围过长
  • Tag 基数过高

解决方案

  1. 实施降采样策略(每小时聚合)
  2. 设置合理的时间范围(默认 1 小时)
  3. 避免在 Tag 中使用高基数数据
  • 推荐: 设备 ID 和位置作为 Tag,便于过滤和分组查询
  • 推荐: 所有数值字段显式转换为 Number 类型
  • 推荐: 设置合理的保留策略(原始数据 30 天,聚合数据更长时间)
  • 避免: 将设备 IP、时间戳等变化频繁的数据作为 Tag
  • 避免: 每个字段写入不同的 Measurement(增加查询复杂度)
  • 避免: 写入频率过高导致存储膨胀(根据业务需求调整采样频率)

本节要点总结:

  1. InfluxDB 数据模型:Measurement + Tags + Fields + Timestamp
  2. Schema 设计:合理选择 Tag 和 Field 影响查询性能
  3. Node-RED 集成:使用 InfluxDB Out 节点或 HTTP API 写入
  4. 数据保留:设置保留策略自动清理过期数据
  5. 降采样:长期数据存储需要实施数据聚合