Join 节点数据聚合
Join 节点数据聚合
本节介绍如何使用 Join 节点聚合多个数据源。学习完成后,您将能够:
- 理解 Join 节点的作用和配置
- 聚合来自多个设备或主题的数据
- 使用 Join 节点合并消息为数组或对象
- 处理分布式传感器数据的合并
Join Node Overview
Section titled “Join Node Overview”Join 节点将多个消息合并成一个:
[Input 1: temperature] ──┐[Input 2: humidity] ──┼──→ [Join Node] → [Output: {temp, hum, press}][Input 3: pressure] ──┘为什么需要 Join 节点?
Section titled “为什么需要 Join 节点?”在 IoT 场景中,经常需要合并来自不同设备或主题的数据:
场景:- 一个设备通过多个 Topic 发送不同类型的传感器数据- 多个设备发送同一指标需要聚合- 收集完整的一组数据后才进行下一步处理Configuration Modes
Section titled “Configuration Modes”Join 节点提供三种工作模式:
Mode 1: Manual (手动模式)
Section titled “Mode 1: Manual (手动模式)”手动配置合并逻辑:
配置:├─ Combine each: [count] messages into├─一条消息(数量模式)├─或所有消息(全部模式)└─超时设置:等待 timeout 后强制输出Mode 2: Automatic (自动模式)
Section titled “Mode 2: Automatic (自动模式)”自动检测输入并合并:
配置:├─ 模式: Key/Value pairs├─ Key: msg.topic (默认)├─ Value: msg.payload└─ 生成: { topic1: payload1, topic2: payload2 }Mode 3: Reduce (归约模式)
Section titled “Mode 3: Reduce (归约模式)”将多条消息归约为一条:
配置:├─ 模式: Reduce├─ 表达式: JSONata 表达式└─ 初始值: 初始累加值Usage Examples
Section titled “Usage Examples”Example 1: 多 Topic 数据合并
Section titled “Example 1: 多 Topic 数据合并”// 传感器设备发送到不同 Topic// Topic: "device/01/humidity" → payload: 68.2// Topic: "device/01/pressure" → payload: 1013.25
// Join 节点配置:// Mode: Automatic// Key: msg.topic// Value: msg.payload
// 输出:{ "device/01/temperature": 25.3, "device/01/humidity": 68.2, "device/01/pressure": 1013.25}Example 2: 计数合并
Section titled “Example 2: 计数合并”// 等待 3 个消息到达后合并
// Join 节点配置:// Mode: Manual// Combine: 3 messages into one// Timeout: 30 seconds
// 输出:[ { topic: "temperature", payload: 25.3 }, { topic: "humidity", payload: 68.2 }, { topic: "pressure", payload: 1013.25 }]Example 3: InfluxDB 格式化输出
Section titled “Example 3: InfluxDB 格式化输出”// 使用 Join 节点 + Function 节点构建数据库写入格式
// Join 节点输出:{ "device/01/temperature": 25.3, "device/01/humidity": 68.2}
// Function 节点格式化:var data = msg.payload;var deviceId = Object.keys(data)[0].split('/')[1];
msg.payload = { measurement: "environment", tags: { device: deviceId }, fields: { temperature: data["device/" + deviceId + "/temperature"], humidity: data["device/" + deviceId + "/humidity"] }, timestamp: Date.now() * 1000000};
return msg;Practical IoT Scenario
Section titled “Practical IoT Scenario”完整 Flow: 车间环境监控
Section titled “完整 Flow: 车间环境监控”[Sensor 1: Temperature] ──┐ ├──→ [Join] → [Function] → [InfluxDB Out][Sensor 2: Humidity] ──┘ │ [Debug: Monitor]Join 节点配置:
Mode: ManualCombine: 2 messages into oneTimeout: 10 secondsFunction 节点处理:
// Join 节点输出的数组:// [{topic: "temp", payload: 25.3}, {topic: "hum", payload: 68.2}]
var combined = {};msg.payload.forEach(function(item) { combined[item.topic] = item.payload;});
// 验证是否所有数据都到达if (!combined.temp || !combined.hum) { node.warn("Incomplete data received"); return null;}
// 格式化 InfluxDB 数据msg.payload = { measurement: "environment", fields: { temperature: combined.temp, humidity: combined.hum }};
return msg;Handling Timeout
Section titled “Handling Timeout”Join 节点支持超时设置,防止等待永远不来的消息:
// Join 节点配置// Timeout: 30 seconds// Mode: Manual// Combine: 3 messages into one
// 超时后,如果少于指定数量的消息到达// Join 节点仍然输出,但只包含已收到的消息
// 在 Function 节点处理超时var received = msg.payload.length;var expected = 3;
if (received < expected) { node.warn("Timeout: only " + received + "/" + expected + " messages received"); // 继续处理已收到的数据}Join vs. Context Aggregation
Section titled “Join vs. Context Aggregation”| 方法 | 优势 | 劣势 | 场景 |
|---|---|---|---|
| Join 节点 | 可视化配置,超时处理 | 不支持复杂聚合逻辑 | 简单的消息合并 |
| Context 聚合 | 灵活,支持复杂逻辑 | 需要编程,手动管理超时 | 复杂的数据关联 |
// Context 方式实现复杂聚合var readBuffer = context.get("readBuffer") || {};
readBuffer[msg.topic] = msg.payload;context.set("readBuffer", readBuffer);
// 检查是否所有数据都到达if (readBuffer.temperature && readBuffer.humidity) { msg.payload = { temperature: readBuffer.temperature, humidity: readBuffer.humidity, timestamp: Date.now() }; context.set("readBuffer", {}); // 清空缓冲区 return msg;}
return null; // 等待更多数据Common Customer Questions
Section titled “Common Customer Questions”Q1: 如果某些数据源不发送数据怎么办?
Section titled “Q1: 如果某些数据源不发送数据怎么办?”A: 设置合理的超时时间,超时后处理部分数据。也可以实现重试机制或发送缺失告警。
Q2: 合并数据的顺序重要吗?
Section titled “Q2: 合并数据的顺序重要吗?”A: Join 节点不保证顺序。如果需要按顺序合并,建议在消息中添加序号字段,然后在 Function 节点中排序。
Q3: 可以合并来自不同 Tab 的数据吗?
Section titled “Q3: 可以合并来自不同 Tab 的数据吗?”A: 可以,Join 节点可以连接任何来源的节点。跨 Tab 的数据需要通过 Link In/Out 节点连接。
✅ 推荐做法:
- 根据数据源数量设置合理的合并计数
- 配置超时机制防止数据阻塞
- 在 Function 中验证合并数据的完整性
- 合并后及时清理缓冲区
- 使用有意义的 Topic 字段便于识别
❌ 避免做法:
- 期望所有数据都必须准时到达
- 忽略超时场景的处理
- 合并大量数据导致内存溢出
- 不验证合并数据的完整性
Summary
Section titled “Summary”- Join 节点是合并多路数据的首选工具
- 支持自动、手动、归约三种模式
- 超时机制防止永久等待
- 适合传感器数据汇聚和设备状态同步
- 复杂聚合场景建议结合 Function 节点