InfluxDB 数据存储
InfluxDB 数据存储
本节介绍如何将处理后的环境数据存储到 InfluxDB 时序数据库中。学习完成后,您将能够:
- 理解 InfluxDB 的核心概念(Measurement、Tag、Field、Bucket)
- 在 Node-RED 中配置 InfluxDB 输出节点
- 设计高效的时序数据存储结构
- 配置数据保留策略和查询优化
在开始本节之前,请确保:
- InfluxDB 2.x 已安装并运行
- Node-RED InfluxDB 节点已安装
- 已完成数据处理 Flow
- 了解 InfluxDB 的基本概念
InfluxDB Core Concepts
Section titled “InfluxDB Core Concepts”Data Model
Section titled “Data Model”InfluxDB 是一种专为时序数据优化的数据库,其数据模型包含以下核心概念:
┌─────────────────────────────────────────────────────┐│ Measurement ││ (类似 SQL 表名) │├─────────────────────────────────────────────────────┤│ Tags (标签) │ Fields (字段) ││ - 索引字段 │ - 实际数值 ││ - 用于过滤和分组 │ - 不建立索引 ││ - 如 device_id │ - 如 temperature │├─────────────────────────────────────────────────────┤│ Timestamp (时间戳) ││ - 默认纳秒精度 ││ - 自动生成或指定 │└─────────────────────────────────────────────────────┘Key Terminology
Section titled “Key Terminology”| 概念 | 说明 | 类比 SQL |
|---|---|---|
| Bucket | 数据存储桶,包含保留策略 | Database |
| Measurement | 测量项名 | Table |
| Tag | 带索引的元数据 | Indexed Column |
| Field | 实际数值 | Non-Indexed Column |
| Point | 单条数据记录 | Row |
| Series | 相同 measurement+tag 的集合 | Group |
| Retention Policy | 数据自动删除策略 | TTL |
Data Schema Design Principles
Section titled “Data Schema Design Principles”✅ GOOD Schema Design:measurement="environment"tags: device_id="ESP32_001", location="zone1"fields: temperature=26.5, humidity=62.3, lux=450timestamp: 1697123456000000000
❌ BAD Schema Design:measurement="zone1_temperature"tags: device="ESP32_001"fields: value=26.5timestamp: 1697123456000000000原则:高基数数据作为 Tag,变化频繁的数值作为 Field
InfluxDB Setup in Node-RED
Section titled “InfluxDB Setup in Node-RED”Step 1: Install InfluxDB Node
Section titled “Step 1: Install InfluxDB Node”1. 打开 Node-RED → 右上角菜单 → Manage Palette2. 选择 Install 标签3. 搜索 "node-red-contrib-influxdb"4. 安装最新版本Step 2: Configure InfluxDB Server
Section titled “Step 2: Configure InfluxDB Server”1. 从左侧节点面板拖入 "influxdb" 节点2. 双击配置: - Name: InfluxDB 2.x - Version: 2.0 - URL: http://influxdb:8086 - Token: [从 InfluxDB UI 获取] - Organization: organization - Bucket: nodered - Skip SSL Verification: true (仅开发环境)3. 点击 Add 保存获取 InfluxDB Token:
# 通过 InfluxDB UI 获取1. 访问 http://localhost:80862. 登录 → Data → API Tokens3. 点击 Generate → Custom API Token4. 选择读写权限 → 生成 TokenStep 3: Configure InfluxDB Out Node
Section titled “Step 3: Configure InfluxDB Out Node”1. 拖入 "influxdb out" 节点2. 配置: - Server: 选择刚配置的 InfluxDB 连接 - Measurement: environment - Organization: organization - Bucket: nodered - Precision: millisecondsData Storage Implementation
Section titled “Data Storage Implementation”Using InfluxDB Out Node
Section titled “Using InfluxDB Out Node”最简单的存储方式——直接使用 InfluxDB Out 节点:
// 在 InfluxDB Out 节点前添加 Function 节点// 构建 InfluxDB 兼容的数据格式
var data = msg.payload;
// 构建 InfluxDB 数据点对象msg.payload = [{ measurement: "environment", tags: { device_id: data.device_id || "unknown", location: data.location || "unknown", sensor_type: "environmental" }, fields: { temperature: Number(data.temperature.value), humidity: Number(data.humidity.value), lux: Number(data.lux.value), signal_rssi: data.quality ? Number(data.quality.signal_rssi) : -100 }, timestamp: Date.now()}];
return msg;Using HTTP API (替代方案)
Section titled “Using HTTP API (替代方案)”// 使用 HTTP Request 节点直接写入 InfluxDB API// HTTP Request 配置:// URL: http://influxdb:8086/api/v2/write?org=organization&bucket=nodered&precision=ms// Method: POST// Headers:// Authorization: Token YOUR_INFLUXDB_TOKEN// Content-Type: text/plain
// Function 节点构建 Line Protocol 字符串var data = msg.payload;
// Line Protocol 格式: measurement,tags fields timestampvar line = "environment," + "device_id=" + data.device_id + "," + "location=" + data.location + "," + "sensor_type=environmental " + "temperature=" + Number(data.temperature.value) + "," + "humidity=" + Number(data.humidity.value) + "," + "lux=" + Number(data.lux.value) + " " + Date.now();
msg.payload = line;msg.headers = { "Authorization": "Token YOUR_INFLUXDB_TOKEN", "Content-Type": "text/plain"};
return msg;Data Schema Design
Section titled “Data Schema Design”Recommended Schema
Section titled “Recommended Schema”Measurement: environment
Tags: device_id - 设备唯一标识(如 "ESP32_001") location - 安装位置(如 "Factory Zone 1") sensor_type - 传感器类型(如 "environmental") firmware_ver - 固件版本(如 "2.1.0")
Fields: temperature - 温度值 (float) humidity - 湿度百分比 (float) lux - 光照值 (float) feels_like - 体感温度 (float) signal_rssi - WiFi 信号强度 (integer)
Timestamp: Unix timestamp (毫秒精度)Schema Design Considerations
Section titled “Schema Design Considerations”| 设计要点 | 推荐做法 | 原因 |
|---|---|---|
| Tag 选择 | 选择基数低、查询频繁的字段 | Tag 有索引,查询更快 |
| Field 选择 | 数值型测量数据 | Field 无索引,适合存储 |
| Measurement 命名 | 小写、下划线分隔 | InfluxDB 命名规范 |
| Tag 值 | 固定值或有限枚举 | 高基数 Tag 影响性能 |
| 时间戳精度 | 毫秒或纳秒 | 满足 IoT 数据的精度需求 |
Data Retention Policy
Section titled “Data Retention Policy”Configure Retention in InfluxDB
Section titled “Configure Retention in InfluxDB”# 通过 InfluxDB CLI 或 UI 配置数据保留策略# 保留 30 天的原始数据# 之后自动删除InfluxDB UI 设置:
1. 访问 http://localhost:80862. Load Data → Buckets3. 选择 "nodered" bucket4. 点击 "Update Retention"5. 设置 "30d" (30 天)Downsampling Strategy
Section titled “Downsampling Strategy”对于长期数据存储,建议实施降采样:
// 持续查询:每小时聚合数据// 在 InfluxDB 中创建 Task
// Function 节点:写入原始数据的同时写入聚合msg.aggregate = { measurement: "environment_1h", tags: { device_id: data.device_id, location: data.location }, fields: { temp_avg: data.temperature.value, temp_min: data.temperature.value, temp_max: data.temperature.value, sample_count: 1 }, timestamp: Math.floor(Date.now() / 3600000) * 3600000};Data Query Examples
Section titled “Data Query Examples”Using InfluxDB UI Data Explorer
Section titled “Using InfluxDB UI Data Explorer”// 查询最近 1 小时的环境数据from(bucket: "nodered") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "environment") |> filter(fn: (r) => r.location == "Factory Zone 1") |> yield(name: "last_hour")// 查询温度均值和最大值(按小时聚合)from(bucket: "nodered") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "environment") |> filter(fn: (r) => r._field == "temperature") |> aggregateWindow(every: 1h, fn: mean) |> yield(name: "hourly_avg")Using HTTP API (REST)
Section titled “Using HTTP API (REST)”# 查询最近 5 分钟的温度数据curl -X POST http://localhost:8086/api/v2/query?org=organization \ -H "Authorization: Token YOUR_TOKEN" \ -H "Content-Type: application/vnd.flux" \ -d 'from(bucket: "nodered") |> range(start: -5m) |> filter(fn: (r) => r._measurement == "environment") |> filter(fn: (r) => r._field == "temperature")'验证数据写入
Section titled “验证数据写入”# 使用 InfluxDB UI 验证数据是否已写入1. 访问 http://localhost:80862. Data Explorer3. 选择 bucket: nodered4. 选择 measurement: environment5. 点击 Submit预期结果:应看到每 3 秒有一个新的数据点写入,包含 temperature、humidity、lux 等字段。
写入验证脚本
Section titled “写入验证脚本”// Node-RED Debug 节点后添加// 检查 InfluxDB 输出节点状态
// 输出内容示例:// msg.payload = [// {// "measurement": "environment",// "tags": {"device_id": "ESP32_001", "location": "Factory Zone 1"},// "fields": {"temperature": 26.5, "humidity": 62.3, "lux": 450},// "timestamp": 1697123456000// }// ]Issue 1: InfluxDB 写入失败
Section titled “Issue 1: InfluxDB 写入失败”症状:Node-RED 中 InfluxDB 节点显示错误
可能原因:
- Token 无效或过期
- Bucket 名称错误
- 字段类型不一致
解决方案:
- 在 InfluxDB UI 中重新生成 Token
- 确认 Bucket 名称完全匹配
- 检查所有 Field 值是否为数值类型(使用
Number()转换)
Issue 2: 数据查询缓慢
Section titled “Issue 2: 数据查询缓慢”症状:Grafana 仪表板加载缓慢
可能原因:
- 数据量过大没有降采样
- 查询时间范围过长
- Tag 基数过高
解决方案:
- 实施降采样策略(每小时聚合)
- 设置合理的时间范围(默认 1 小时)
- 避免在 Tag 中使用高基数数据
- ✅ 推荐: 设备 ID 和位置作为 Tag,便于过滤和分组查询
- ✅ 推荐: 所有数值字段显式转换为 Number 类型
- ✅ 推荐: 设置合理的保留策略(原始数据 30 天,聚合数据更长时间)
- ❌ 避免: 将设备 IP、时间戳等变化频繁的数据作为 Tag
- ❌ 避免: 每个字段写入不同的 Measurement(增加查询复杂度)
- ❌ 避免: 写入频率过高导致存储膨胀(根据业务需求调整采样频率)
Summary
Section titled “Summary”本节要点总结:
- InfluxDB 数据模型:Measurement + Tags + Fields + Timestamp
- Schema 设计:合理选择 Tag 和 Field 影响查询性能
- Node-RED 集成:使用 InfluxDB Out 节点或 HTTP API 写入
- 数据保留:设置保留策略自动清理过期数据
- 降采样:长期数据存储需要实施数据聚合