告警系统
告警系统
本节介绍自动投料系统的告警系统设计,包括低液位告警、设备故障告警和异常检测。学习完成后,您将能够:
- 设计多级告警规则
- 实现 Telegram/Email 告警通知
- 配置告警抑制和升级机制
- 处理传感器故障和设备离线告警
Alert Architecture
Section titled “Alert Architecture”┌──────────────────────────────────────────────────────────────┐│ 告警系统架构 │├──────────────────────────────────────────────────────────────┤│ ││ [告警源] ││ ├── 液位过低 (level < 20%) ││ ├── 容器缺水 (level < 5%) ││ ├── 液位快速下降 (趋势异常) ││ ├── 传感器故障 (measurement timeout) ││ ├── 设备离线 (唤醒间隔 > 2 小时) ││ └── 投料异常 (投料后液位无变化) ││ ││ ▼ ││ [Node-RED 告警引擎] ││ ││ ├── 1. 告警级别判定 (Info/Warning/Critical) ││ ├── 2. 告警抑制 (同类型 5 分钟内不重复) ││ ├── 3. 告警升级 (连续触发 3 次 → 升级) ││ └── 4. 通知路由 ││ ││ ▼ ││ [通知通道] ││ ├── Telegram Bot (即时推送) ││ ├── Email (正式通知) ││ └── Dashboard (状态显示) ││ │└──────────────────────────────────────────────────────────────┘Alert Rules Definition
Section titled “Alert Rules Definition”// Node-RED Function: 告警规则引擎 (shared config)
// 告警规则配置var alertRules = { // === 液位告警 === low_level: { enabled: true, level: "warning", threshold: 20, // 液位 < 20% message: "⚠️ 液位偏低: {level}% (容器 {container})", suppressSeconds: 300 // 5 分钟内不重复 }, critical_level: { enabled: true, level: "critical", threshold: 5, // 液位 < 5% message: "🔴 液位过低: {level}% - 请立即补充原料!", suppressSeconds: 600 // 10 分钟内不重复 },
// === 趋势告警 === rapid_drop: { enabled: true, level: "critical", threshold: 15, // 两次唤醒间液位下降 > 15% message: "🔴 液位快速下降: 从 {from}% 降至 {to}% - 可能泄漏!", suppressSeconds: 1800 // 30 分钟内不重复 },
// === 设备告警 === sensor_failure: { enabled: true, level: "warning", threshold: 1, // 1 次失败即告警 message: "⚠️ 传感器故障: 距离测量超时 (容器 {container})", suppressSeconds: 3600 // 1 小时内不重复 }, device_offline: { enabled: true, level: "critical", threshold: 7200, // 超过 2 小时未唤醒 message: "🔴 设备离线: 已 {hours} 小时未唤醒 (上次: {lastWake})", suppressSeconds: 7200 // 2 小时内不重复 },
// === 投料异常告警 === dosing_anomaly: { enabled: true, level: "warning", threshold: 1, message: "⚠️ 投料异常: 投料后液位未上升 (容器 {container})", suppressSeconds: 3600 }};
// 存储到 Flow Contextif (flow.get("alertRules") === undefined) { flow.set("alertRules", alertRules);}Alert Engine Implementation
Section titled “Alert Engine Implementation”// Function: 告警引擎// 每次收到传感器数据时检查所有告警规则
var sensorData = msg.payload || flow.get("lastSensorData");var rules = flow.get("alertRules");var alerts = [];
// ===== 液位告警检查 =====var levelPercent = sensorData.level || 0;
if (levelPercent >= 0) { // 低液位告警 if (rules.low_level.enabled && levelPercent < rules.low_level.threshold) { alerts.push(checkAlert("low_level", { level: levelPercent, container: "A" })); }
// 严重低液位告警 if (rules.critical_level.enabled && levelPercent < rules.critical_level.threshold) { alerts.push(checkAlert("critical_level", { level: levelPercent, container: "A" })); }}
// ===== 趋势告警检查 =====var prevLevel = flow.get("previousLevel");if (prevLevel !== undefined && levelPercent >= 0) { var drop = prevLevel - levelPercent; if (rules.rapid_drop.enabled && drop >= rules.rapid_drop.threshold) { alerts.push(checkAlert("rapid_drop", { from: prevLevel, to: levelPercent, container: "A" })); }}flow.set("previousLevel", levelPercent);
// ===== 传感器故障检查 =====if (sensorData.error === "sensor_failure" && rules.sensor_failure.enabled) { alerts.push(checkAlert("sensor_failure", { container: "A" }));}
// ===== 处理告警抑制 =====function checkAlert(ruleName, params) { var rule = rules[ruleName]; var suppressKey = "suppress_" + ruleName; var lastAlert = flow.get(suppressKey) || 0; var now = Date.now();
// 检查抑制期 if (now - lastAlert < rule.suppressSeconds * 1000) { node.log("Alert suppressed: " + ruleName + " (in suppress period)"); return null; }
// 更新抑制时间 flow.set(suppressKey, now);
// 返回告警消息 var message = rule.message; for (var key in params) { message = message.replace("{" + key + "}", params[key]); }
return { rule: ruleName, level: rule.level, message: message, params: params, timestamp: now };}
// ===== 处理触发告警 =====var activeAlerts = alerts.filter(function(a) { return a !== null; });if (activeAlerts.length > 0) { // 更新告警计数 var alertCounters = flow.get("alertCounters") || {}; activeAlerts.forEach(function(a) { alertCounters[a.rule] = (alertCounters[a.rule] || 0) + 1; }); flow.set("alertCounters", alertCounters);
// 输出告警 msg.alerts = activeAlerts; return [msg, activeAlerts]; // 使用多输出: 0=正常, 1=告警}
return [msg, null];Alert Notification via Telegram
Section titled “Alert Notification via Telegram”// Function: 发送 Telegram 告警通知// 接收警报引擎的输出
var alerts = msg.payload;
if (!alerts || alerts.length === 0) return null;
var chatId = flow.get("telegramChatId") || "987654321";
// 构建通知消息var message = "🚨 *自动投料系统告警*\n\n";
alerts.forEach(function(alert) { var emoji = alert.level === "critical" ? "🔴" : "⚠️"; message += emoji + " *[" + alert.level.toUpperCase() + "]* "; message += alert.message + "\n\n";});
// 添加系统状态message += "---\n";message += "📊 当前液位: " + (flow.get("lastLevel") || "-") + "%\n";message += "🕐 时间: " + new Date().toLocaleString();
// 配置 Telegram Sender 输出msg.payload = { chatId: chatId, type: "message", content: message, parse_mode: "Markdown"};
return msg;Alert Escalation Logic
Section titled “Alert Escalation Logic”// Function: 告警升级逻辑// 如果同一告警在 1 小时内触发超过 3 次,升级通知级别
var counters = flow.get("alertCounters") || {};var escalation = flow.get("alertEscalation") || {};var escalatedAlerts = [];var now = Date.now();
for (var rule in counters) { if (counters[rule] >= 3) { // 检查是否已经升级过 var lastEscalation = escalation[rule] || 0;
if (now - lastEscalation > 3600000) { // 1 小时内只升级一次 escalatedAlerts.push({ rule: rule, originalCount: counters[rule], message: "🔴 [升级] 告警 \"" + rule + "\" 已触发 " + counters[rule] + " 次,请立即关注!", timestamp: now }); escalation[rule] = now; } }}
flow.set("alertEscalation", escalation);
if (escalatedAlerts.length > 0) { // 升级通知发送给更高级别 (如经理) msg.payload = escalatedAlerts; msg.channel = "manager"; // 区分普通通知和升级通知 return msg;}
return null; // 无需升级# 1. 测试低液位告警mosquitto_pub -t "esp32/dosing/info" \ -m '{"distance":22,"level":15,"boot":6}'# 预期: ⚠️ 液位偏低: 15% (容器 A)
# 2. 测试严重低液位告警mosquitto_pub -t "esp32/dosing/info" \ -m '{"distance":23,"level":3,"boot":7}'# 预期: 🔴 液位过低: 3% - 请立即补充原料!
# 3. 测试传感器故障mosquitto_pub -t "esp32/dosing/status" \ -m '{"state":"error","error":"sensor_failure"}'# 预期: ⚠️ 传感器故障: 距离测量超时
# 4. 测试告警抑制# 连续发送 3 次低液位告警# 第 1 次: 发送通知# 第 2,3 次 (5 分钟内): 抑制,不发送# 5 分钟后: 可再次发送
# 5. 验证 Telegram 接收# 手机端应收到格式化的告警消息Common Customer Questions
Section titled “Common Customer Questions”Q1: 如何避免夜间告警打扰?
Section titled “Q1: 如何避免夜间告警打扰?”// Function: 按时间过滤通知var hour = new Date().getHours();
if (hour >= 22 || hour < 7) { // 夜间: 仅严重告警 (critical) 才通知 // warning 级别静默 if (msg.payload.level !== "critical") { return null; // 丢弃 }}return msg;Q2: 告警太多怎么办?
Section titled “Q2: 告警太多怎么办?”- 合理设置抑制期(同一告警短时间内不重复)
- 使用告警级别区分(Info/Warning/Critical)
- 配置告警聚合(同一设备的多条告警合并为一条)
- 设置静默时段(如夜间仅 Critical 告警通知)
Q3: 如何记录告警历史?
Section titled “Q3: 如何记录告警历史?”// Function: 记录告警到 MariaDBmsg.topic = "INSERT INTO alert_history " + "(rule, level, message, timestamp) VALUES (" + "'" + alert.rule + "', " + "'" + alert.level + "', " + "'" + alert.message.replace(/'/g, "''") + "', " + Math.floor(Date.now() / 1000) + ")";return msg;✅ 推荐做法:
- 设置告警抑制期避免消息轰炸
- 使用告警级别分级 (Info/Warning/Critical)
- 关键告警 (Critical) 不过滤,确保送达
- 告警信息包含上下文(时间、液位、设备)
- 告警引擎独立于核心逻辑运行
❌ 避免做法:
- 所有告警使用相同级别和频率
- 忽略告警抑制导致 Telegram 消息刷屏
- 告警消息不含设备/容器标识
- 告警引擎阻塞主业务流程
Summary
Section titled “Summary”- 告警规则: 低液位/快速下降/传感器故障/设备离线
- 告警引擎: 阈值检查 → 抑制 → 通知
- 抑制机制: 同类型告警 5-30 分钟不重复
- 升级逻辑: 频繁触发的告警自动升级通知级别
- 通知渠道: Telegram 即时推送 + Email 正式通知