跳转到内容

JSON 数据处理

JSON 数据处理

本节介绍如何在 Node-RED 中处理 JSON 数据。学习完成后,您将能够:

  • 解析和构建 JSON 数据
  • 访问嵌套 JSON 对象的属性
  • 处理来自 API 和 MQTT 的 JSON 数据
  • 使用 JSON 节点简化解析操作

JSON (JavaScript Object Notation) 是 IoT 场景中最常用的数据格式:

{
"device": "SENSOR-01",
"type": "temperature_sensor",
"location": "factory-a",
"readings": {
"temperature": 25.3,
"humidity": 68.2,
"pressure": 1013.25
},
"timestamp": "2026-05-18T10:30:00Z"
}
特性说明IoT 价值
通用性几乎所有语言和平台都支持跨平台通信
可读性人类可读的文本格式便于排错
结构化支持嵌套和复杂结构表达复杂数据
轻量级相比 XML 更精简减少网络传输

Node-RED 内置了 JSON 节点,简化 JSON 解析操作:

[JSON Node]
├─ JSON to Object (字符串 → 对象)
└─ Object to JSON (对象 → 字符串)
{
"property": "payload",
"action": "str2obj" // 字符串转对象
// 或 "obj2str" // 对象转字符串
}

ESP32 通过 MQTT 发送 JSON 数据是常见场景:

// ESP32 Arduino 代码
void loop() {
StaticJsonDocument<256> doc;
doc["device"] = "SENSOR-01";
doc["temperature"] = dht.readTemperature();
doc["humidity"] = dht.readHumidity();
doc["timestamp"] = millis();
char buffer[256];
serializeJson(doc, buffer);
client.publish("sensor/data", buffer);
delay(5000);
}

方法 1: 使用 JSON 节点

[MQTT In: sensor/data] → [JSON: str→obj] → [Function: 处理]
[InfluxDB Out]

方法 2: 在 Function 节点中解析

// Function 节点中直接解析 JSON
try {
var data = JSON.parse(msg.payload);
// 访问解析后的数据
var device = data.device;
var temp = data.temperature;
var hum = data.humidity;
msg.payload = {
device: device,
temperature: temp,
humidity: hum
};
return msg;
} catch (e) {
node.error("JSON parse failed: " + e.message);
return null;
}
{
"factory": {
"name": "Factory A",
"lines": [
{
"name": "Line 1",
"sensors": {
"temperature": 25.3,
"humidity": 68.2
}
}
]
},
"metadata": {
"timestamp": "2026-05-18T10:30:00Z",
"version": "2.1.0"
}
}

访问嵌套属性:

// Function 节点
var factory = msg.payload;
// 点操作符访问
var factoryName = factory.factory.name;
var temperature = factory.factory.lines[0].sensors.temperature;
// 安全访问(带默认值)
var temp = factory?.factory?.lines?.[0]?.sensors?.temperature || 0;
// 可选链操作
var version = factory?.metadata?.version || "unknown";
msg.payload = {
factory: factoryName,
temperature: temperature,
version: version
};
return msg;
// 从 MQTT 数据构建 InfluxDB 写入格式
var sensorData = msg.payload;
// 构建 InfluxDB 点格式
var influxPoint = {
measurement: sensorData.type || "sensor_data",
tags: {
device: sensorData.device || "unknown",
location: sensorData.location || "unknown"
},
fields: {
temperature: Number(sensorData.temperature),
humidity: Number(sensorData.humidity),
pressure: Number(sensorData.pressure || 0)
},
timestamp: Date.now() * 1000000 // 纳秒
};
msg.payload = influxPoint;
return msg;
// 构建用于 HTTP 请求的 JSON
var statusData = {
device: msg.topic.split('/')[1],
timestamp: new Date().toISOString(),
readings: {
temperature: Number(msg.payload),
unit: "celsius"
}
};
msg.payload = JSON.stringify(statusData);
msg.headers = {
"Content-Type": "application/json"
};
return msg;
// 输入: { "temp": 25.3, "hum": 68.2 }
// 输出: { "temperature": 25.3, "humidity": 68.2 }
var input = msg.payload;
msg.payload = {
temperature: input.temp,
humidity: input.hum,
timestamp: Date.now()
};
return msg;
// 输入: 多个 MQTT 消息
// {"device": "SENSOR-01", "temperature": 25.3}
// {"device": "SENSOR-01", "humidity": 68.2}
// 合并为一个完整 JSON
var combined = context.get("combined") || {};
combined[msg.topic] = msg.payload;
context.set("combined", combined);
// 当所有数据都到达时发送
if (combined.temperature && combined.humidity) {
msg.payload = {
device: "SENSOR-01",
temperature: combined.temperature,
humidity: combined.humidity,
timestamp: Date.now()
};
context.set("combined", {});
return msg;
}
return null; // 等待更多数据
// 输入: [{temp: 25.3}, {temp: 25.5}, {temp: 25.1}]
// 输出: {avg: 25.3, min: 25.1, max: 25.5}
var readings = msg.payload;
var temps = readings.map(r => r.temp);
msg.payload = {
average: (temps.reduce((a, b) => a + b, 0) / temps.length).toFixed(1),
minimum: Math.min(...temps),
maximum: Math.max(...temps),
count: temps.length
};
return msg;
// 健壮的 JSON 处理
function processJSON(msg) {
try {
// 处理字符串类型
if (typeof msg.payload === 'string') {
msg.payload = JSON.parse(msg.payload);
}
// 验证必要字段
if (!msg.payload.device) {
throw new Error("Missing device field");
}
// 处理数据
msg.payload.value = Number(msg.payload.value);
return msg;
} catch (error) {
// 记录错误
node.error("JSON processing error: " + error.message);
node.warn("Original message: " + JSON.stringify(msg.payload));
// 返回空(丢弃消息)
return null;
}
}
return processJSON(msg);

A: 使用可选链操作符 ?. 安全访问,或使用 Lodash 的 _.get() 方法。避免深层嵌套。

A: 使用 try/catch 捕获解析错误,记录原始消息,并添加重试或通知机制。

A: 使用 Split 节点拆分数组,分批处理。Function 节点中避免一次性加载超大 JSON。

推荐做法:

  • 使用 JSON 节点简化解析操作
  • 嵌套对象访问添加默认值保护
  • 验证 JSON 结构的完整性
  • 使用 try/catch 处理解析错误
  • 保持 JSON 结构扁平化

避免做法:

  • 忽略 JSON 解析失败的处理
  • 嵌套层级过深(建议不超过 3 层)
  • 在单个节点处理超大 JSON
  • 不验证来源数据的合法性
  • 直接信任外部 JSON 数据
  1. JSON 是 IoT 通信的标准数据格式
  2. JSON 节点提供快速的格式转换
  3. Function 节点处理复杂的 JSON 操作
  4. 可选链操作符安全访问嵌套属性
  5. 错误处理是 JSON 处理的必备环节