跳转到内容

Join 节点数据聚合

Join 节点数据聚合

本节介绍如何使用 Join 节点聚合多个数据源。学习完成后,您将能够:

  • 理解 Join 节点的作用和配置
  • 聚合来自多个设备或主题的数据
  • 使用 Join 节点合并消息为数组或对象
  • 处理分布式传感器数据的合并

Join 节点将多个消息合并成一个:

[Input 1: temperature] ──┐
[Input 2: humidity] ──┼──→ [Join Node] → [Output: {temp, hum, press}]
[Input 3: pressure] ──┘

在 IoT 场景中,经常需要合并来自不同设备或主题的数据:

场景:
- 一个设备通过多个 Topic 发送不同类型的传感器数据
- 多个设备发送同一指标需要聚合
- 收集完整的一组数据后才进行下一步处理

Join 节点提供三种工作模式:

手动配置合并逻辑:

配置:
├─ Combine each: [count] messages into
├─一条消息(数量模式)
├─或所有消息(全部模式)
└─超时设置:等待 timeout 后强制输出

自动检测输入并合并:

配置:
├─ 模式: Key/Value pairs
├─ Key: msg.topic (默认)
├─ Value: msg.payload
└─ 生成: { topic1: payload1, topic2: payload2 }

将多条消息归约为一条:

配置:
├─ 模式: Reduce
├─ 表达式: JSONata 表达式
└─ 初始值: 初始累加值
25.3
// 传感器设备发送到不同 Topic
// Topic: "device/01/humidity" → payload: 68.2
// Topic: "device/01/pressure" → payload: 1013.25
// Join 节点配置:
// Mode: Automatic
// Key: msg.topic
// Value: msg.payload
// 输出:
{
"device/01/temperature": 25.3,
"device/01/humidity": 68.2,
"device/01/pressure": 1013.25
}
// 等待 3 个消息到达后合并
// Join 节点配置:
// Mode: Manual
// Combine: 3 messages into one
// Timeout: 30 seconds
// 输出:
[
{ topic: "temperature", payload: 25.3 },
{ topic: "humidity", payload: 68.2 },
{ topic: "pressure", payload: 1013.25 }
]
// 使用 Join 节点 + Function 节点构建数据库写入格式
// Join 节点输出:
{
"device/01/temperature": 25.3,
"device/01/humidity": 68.2
}
// Function 节点格式化:
var data = msg.payload;
var deviceId = Object.keys(data)[0].split('/')[1];
msg.payload = {
measurement: "environment",
tags: {
device: deviceId
},
fields: {
temperature: data["device/" + deviceId + "/temperature"],
humidity: data["device/" + deviceId + "/humidity"]
},
timestamp: Date.now() * 1000000
};
return msg;
[Sensor 1: Temperature] ──┐
├──→ [Join] → [Function] → [InfluxDB Out]
[Sensor 2: Humidity] ──┘ │
[Debug: Monitor]

Join 节点配置:

Mode: Manual
Combine: 2 messages into one
Timeout: 10 seconds

Function 节点处理:

// Join 节点输出的数组:
// [{topic: "temp", payload: 25.3}, {topic: "hum", payload: 68.2}]
var combined = {};
msg.payload.forEach(function(item) {
combined[item.topic] = item.payload;
});
// 验证是否所有数据都到达
if (!combined.temp || !combined.hum) {
node.warn("Incomplete data received");
return null;
}
// 格式化 InfluxDB 数据
msg.payload = {
measurement: "environment",
fields: {
temperature: combined.temp,
humidity: combined.hum
}
};
return msg;

Join 节点支持超时设置,防止等待永远不来的消息:

// Join 节点配置
// Timeout: 30 seconds
// Mode: Manual
// Combine: 3 messages into one
// 超时后,如果少于指定数量的消息到达
// Join 节点仍然输出,但只包含已收到的消息
// 在 Function 节点处理超时
var received = msg.payload.length;
var expected = 3;
if (received < expected) {
node.warn("Timeout: only " + received + "/" + expected + " messages received");
// 继续处理已收到的数据
}
方法优势劣势场景
Join 节点可视化配置,超时处理不支持复杂聚合逻辑简单的消息合并
Context 聚合灵活,支持复杂逻辑需要编程,手动管理超时复杂的数据关联
// Context 方式实现复杂聚合
var readBuffer = context.get("readBuffer") || {};
readBuffer[msg.topic] = msg.payload;
context.set("readBuffer", readBuffer);
// 检查是否所有数据都到达
if (readBuffer.temperature && readBuffer.humidity) {
msg.payload = {
temperature: readBuffer.temperature,
humidity: readBuffer.humidity,
timestamp: Date.now()
};
context.set("readBuffer", {}); // 清空缓冲区
return msg;
}
return null; // 等待更多数据

Q1: 如果某些数据源不发送数据怎么办?

Section titled “Q1: 如果某些数据源不发送数据怎么办?”

A: 设置合理的超时时间,超时后处理部分数据。也可以实现重试机制或发送缺失告警。

A: Join 节点不保证顺序。如果需要按顺序合并,建议在消息中添加序号字段,然后在 Function 节点中排序。

Q3: 可以合并来自不同 Tab 的数据吗?

Section titled “Q3: 可以合并来自不同 Tab 的数据吗?”

A: 可以,Join 节点可以连接任何来源的节点。跨 Tab 的数据需要通过 Link In/Out 节点连接。

推荐做法:

  • 根据数据源数量设置合理的合并计数
  • 配置超时机制防止数据阻塞
  • 在 Function 中验证合并数据的完整性
  • 合并后及时清理缓冲区
  • 使用有意义的 Topic 字段便于识别

避免做法:

  • 期望所有数据都必须准时到达
  • 忽略超时场景的处理
  • 合并大量数据导致内存溢出
  • 不验证合并数据的完整性
  1. Join 节点是合并多路数据的首选工具
  2. 支持自动、手动、归约三种模式
  3. 超时机制防止永久等待
  4. 适合传感器数据汇聚和设备状态同步
  5. 复杂聚合场景建议结合 Function 节点