跳转到内容

Node-RED 调度逻辑

Node-RED 调度逻辑

本节介绍如何在 Node-RED 中构建投料调度逻辑的核心流程。这是整个系统的”大脑”——接收 ESP32 数据,查询数据库,计算时间差,决定是否投料,并返回指令。学习完成后,您将能够:

  • 构建完整的 Node-RED 投料调度流程
  • 实现数据库驱动的投料决策逻辑
  • 处理多路投料的并行调度
┌──────────────────────────────────────────────────────────────┐
│ Node-RED 投料调度流程 │
├──────────────────────────────────────────────────────────────┤
│ │
│ [MQTT In: esp32/dosing/info] ←── ESP32 唤醒后发送液位数据 │
│ │ │
│ ├──→ [Function: 解析数据并保存到 Flow Context] │
│ │ │
│ ├──→ [Function: 构建 SQL 查询上次投料时间] │
│ │ │ │
│ │ └──→ [MariaDB: SELECT] │
│ │ │ │
│ │ ├──→ [Function: 时间差计算] │
│ │ │ │ │
│ │ │ ├──→ 需要投料? │
│ │ │ │ ├── 是 → MQTT: 1 │
│ │ │ │ └── 否 → MQTT: 0 │
│ │ │ │ │
│ │ └──→ [MQTT Out: esp32/dosing/control]│
│ │ │
│ ├──→ [Function: 构建 INSERT 液位数据] │
│ │ │ │
│ │ └──→ [MariaDB: INSERT] │
│ │ │
│ └──→ [Function: 检查液位 → 告警通知] │
│ │
└──────────────────────────────────────────────────────────────┘

通过 Adminer 创建数据库和表:

-- 创建数据库
CREATE DATABASE IF NOT EXISTS dosing_system
CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE dosing_system;
-- 1. 液位历史表
CREATE TABLE IF NOT EXISTS level_history (
id INT AUTO_INCREMENT PRIMARY KEY,
timestamp INT NOT NULL,
distance1 INT,
distance2 INT,
level1 INT,
level2 INT,
dosing_triggered BOOLEAN DEFAULT FALSE
);
-- 2. 深度睡眠记录表 (用于监控设备健康状况)
CREATE TABLE IF NOT EXISTS deep_sleep_records (
id INT AUTO_INCREMENT PRIMARY KEY,
timestamp INT NOT NULL,
boot_count INT DEFAULT 0,
total_dosing INT DEFAULT 0
);
-- 3. 投料计划表 (可远程配置)
CREATE TABLE IF NOT EXISTS dosing_schedule (
id INT AUTO_INCREMENT PRIMARY KEY,
zone VARCHAR(50) NOT NULL,
interval_seconds INT NOT NULL DEFAULT 86400,
duration_ms INT NOT NULL DEFAULT 3000,
enabled BOOLEAN DEFAULT TRUE,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 插入默认投料计划
INSERT INTO dosing_schedule (zone, interval_seconds, duration_ms)
VALUES ('container_1', 86400, 3000);
-- 86400 秒 = 24 小时,每天投料一次

安装 MariaDB 节点:

Terminal window
# Palette 安装
node-red-contrib-mysql2
# 或
node-red-node-mysql

MariaDB 节点配置:

参数
Hostmariadb (Docker 服务名) 或 IP
Port3306
Databasedosing_system
Userroot
Passwordyour_password
// Function: 查询上次投料时间
// 从 level_history 表中获取最新的记录
// 构建 SQL 查询
msg.topic = "SELECT timestamp FROM level_history " +
"ORDER BY id DESC LIMIT 1";
// 可选: 按区域查询
var zone = msg.payload.zone || "container_1";
// msg.topic = "SELECT timestamp FROM level_history " +
// "WHERE zone = '" + zone + "' " +
// "ORDER BY id DESC LIMIT 1";
return msg;
// Function: 计算时间差并决定是否投料
// 输入: msg.payload = SELECT 查询结果
// 输出: MQTT 控制指令 (1 或 0)
// 配置投料间隔 (秒)
var DOSING_INTERVAL = 86400; // 24 小时
// 可从 dosing_schedule 表动态获取
// 从数据库获取上次投料时间
var lastTimestamp = 0;
if (msg.payload && msg.payload.length > 0) {
lastTimestamp = msg.payload[0].timestamp;
}
// 当前时间 (Unix 秒)
var currentTime = Math.floor(Date.now() / 1000);
var timeDiff = currentTime - lastTimestamp;
node.log("Last dosing: " + lastTimestamp +
" (" + (lastTimestamp ? new Date(lastTimestamp * 1000).toLocaleString() : "never") + ")");
node.log("Current time: " + currentTime);
node.log("Time diff: " + timeDiff + "s (required: " + DOSING_INTERVAL + "s)");
// 决策
var shouldDose = 0;
if (lastTimestamp === 0) {
// 首次运行,执行投料
shouldDose = 1;
node.log("Decision: First run, dosing needed");
} else if (timeDiff >= DOSING_INTERVAL) {
// 距离上次投料超过间隔
shouldDose = 1;
node.log("Decision: Interval elapsed, dosing needed");
} else {
var remaining = DOSING_INTERVAL - timeDiff;
node.log("Decision: Not yet, " +
Math.round(remaining / 3600) + "h remaining");
}
// 发送控制指令
msg.payload = String(shouldDose); // MQTT 需要字符串
msg.topic = "esp32/dosing/control";
msg.qos = 1;
return msg;
// Function: 构建 INSERT 液位数据 SQL
var data = flow.get("lastSensorData") || {};
var currentTime = Math.floor(Date.now() / 1000);
msg.topic = "INSERT INTO level_history " +
"(timestamp, distance1, level1, dosing_triggered) " +
"VALUES (" +
currentTime + ", " +
(data.distance || 0) + ", " +
(data.level || 0) + ", " +
(flow.get("dosingTriggered") ? 1 : 0) +
")";
return msg;
{
"id": "dosing-scheduler",
"name": "自动投料调度",
"nodes": [
{
"id": "mqtt-in",
"type": "mqtt in",
"name": "ESP32 数据",
"topic": "esp32/dosing/info",
"qos": "1",
"broker": "local-broker"
},
{
"id": "parse-data",
"type": "function",
"name": "解析传感器数据",
"func": "var data = msg.payload;\nflow.set(\"lastSensorData\", data);\nflow.set(\"lastWakeTime\", Date.now());\nreturn msg;"
},
{
"id": "build-select",
"type": "function",
"name": "查询上次投料",
"func": "msg.topic = \"SELECT timestamp FROM level_history ORDER BY id DESC LIMIT 1\";\nreturn msg;"
},
{
"id": "mariadb-select",
"type": "mysql",
"name": "MariaDB",
"host": "mariadb",
"port": "3306",
"db": "dosing_system",
"user": "root"
},
{
"id": "time-logic",
"type": "function",
"name": "时间差计算",
"func": "// 时间差计算逻辑 (见上文)"
},
{
"id": "mqtt-control",
"type": "mqtt out",
"name": "投料指令",
"topic": "esp32/dosing/control",
"qos": "1",
"broker": "local-broker"
}
]
}
// Node-RED Function: 动态配置投料计划
// 通过 Dashboard UI 调整投料间隔
// 接收 Dashboard 输入
var interval = msg.payload.interval_hours; // 小时数
var zone = msg.payload.zone;
// 转换为秒
var intervalSeconds = interval * 3600;
// 更新 dosing_schedule 表
msg.topic = "UPDATE dosing_schedule SET " +
"interval_seconds = " + intervalSeconds + ", " +
"updated_at = CURRENT_TIMESTAMP " +
"WHERE zone = '" + zone + "'";
return msg;
┌──────────────────────────────────────────────┐
│ 投料调度配置 │
├──────────────────────────────────────────────┤
│ │
│ 区域 1: 原料容器 A │
│ 投料间隔: [24] 小时 [保存] │
│ 每次投料: [3] 秒 [保存] │
│ ├── 上次投料: 2026-05-18 14:30 │
│ └── 下次投料: 2026-05-19 14:30 │
│ │
│ 区域 2: 原料容器 B │
│ 投料间隔: [48] 小时 [保存] │
│ 每次投料: [5] 秒 [保存] │
│ │
│ [手动触发投料 A] [手动触发投料 B] │
│ │
└──────────────────────────────────────────────┘
Terminal window
# 1. 模拟 ESP32 唤醒
mosquitto_pub -t "esp32/dosing/info" \
-m '{"distance":16,"level":33,"boot":5}'
# 2. 检查 Node-RED 调试输出
# 应看到:
# Last dosing: 1716013825 (2026-05-18, 14:30:25)
# Current time: 1716017425
# Time diff: 3600s (required: 86400s)
# Decision: Not yet, 23h remaining
# 3. 检查 MQTT 控制指令
mosquitto_sub -t "esp32/dosing/control" -v
# 初始: esp32/dosing/control 0 (间隔未到)
# 手动触发后: esp32/dosing/control 1
# 4. 验证数据库记录
# 通过 Adminer 或 Node-RED 查询 level_history 表

Q1: 为什么不在 ESP32 上直接做时间判断?

Section titled “Q1: 为什么不在 ESP32 上直接做时间判断?”
  1. 灵活性: Node-RED 修改调度无需烧录 ESP32
  2. 持久性: MariaDB 存储投料历史,ESP32 睡眠状态不保存
  3. 可追溯: 所有数据存储在数据库中,便于分析和审计
  4. 多路联动: Node-RED 可以处理复杂的”如果 A 投料了则 B 延迟”等逻辑

Q2: 如何实现”每天上午 8 点投料”?

Section titled “Q2: 如何实现”每天上午 8 点投料”?”

修改 Node-RED 时间逻辑:

// Function: 固定时间投料
var currentHour = new Date().getHours();
if (currentHour === 8 && flow.get("todayDosed") !== true) {
// 上午 8 点,进行投料
msg.payload = "1";
flow.set("todayDosed", true);
} else {
msg.payload = "0";
}
// 每天 0 点重置投料标记
if (currentHour === 0) {
flow.set("todayDosed", false);
}

在 MariaDB 中为每个区域创建独立的配置记录,Node-RED 根据 MQTT 消息中的 zone 字段动态查询对应配置。

推荐做法:

  • 将调度逻辑全部放在 Node-RED,ESP32 只执行简单指令
  • 使用 MariaDB 持久化投料历史,确保掉电不丢失
  • 为每个区域创建独立的配置
  • 添加手动投料功能用于测试和应急
  • Dashboard 中显示下次投料倒计时

避免做法:

  • 在 ESP32 代码中使用固定 delay 做时间判断
  • 忽略数据库查询错误处理
  • SQL 注入风险 (用户输入的间隔值需要验证)
  • 多个 ESP32 节点共用调度逻辑时未隔离区域
  1. 查询-响应模式: ESP32 → MQTT → Node-RED → MariaDB → 决策
  2. 时间差计算: 比较当前时间和上次投料时间
  3. 动态配置: 通过 dosing_schedule 表远程调整投料间隔
  4. 持久化存储: 液位数据存入 level_history 表
  5. 灵活性: 修改调度策略无需触 ESP32 固件