跳转到内容

告警系统

告警系统

本节介绍自动投料系统的告警系统设计,包括低液位告警、设备故障告警和异常检测。学习完成后,您将能够:

  • 设计多级告警规则
  • 实现 Telegram/Email 告警通知
  • 配置告警抑制和升级机制
  • 处理传感器故障和设备离线告警
┌──────────────────────────────────────────────────────────────┐
│ 告警系统架构 │
├──────────────────────────────────────────────────────────────┤
│ │
│ [告警源] │
│ ├── 液位过低 (level < 20%) │
│ ├── 容器缺水 (level < 5%) │
│ ├── 液位快速下降 (趋势异常) │
│ ├── 传感器故障 (measurement timeout) │
│ ├── 设备离线 (唤醒间隔 > 2 小时) │
│ └── 投料异常 (投料后液位无变化) │
│ │
│ ▼ │
│ [Node-RED 告警引擎] │
│ │
│ ├── 1. 告警级别判定 (Info/Warning/Critical) │
│ ├── 2. 告警抑制 (同类型 5 分钟内不重复) │
│ ├── 3. 告警升级 (连续触发 3 次 → 升级) │
│ └── 4. 通知路由 │
│ │
│ ▼ │
│ [通知通道] │
│ ├── Telegram Bot (即时推送) │
│ ├── Email (正式通知) │
│ └── Dashboard (状态显示) │
│ │
└──────────────────────────────────────────────────────────────┘
// Node-RED Function: 告警规则引擎 (shared config)
// 告警规则配置
var alertRules = {
// === 液位告警 ===
low_level: {
enabled: true,
level: "warning",
threshold: 20, // 液位 < 20%
message: "⚠️ 液位偏低: {level}% (容器 {container})",
suppressSeconds: 300 // 5 分钟内不重复
},
critical_level: {
enabled: true,
level: "critical",
threshold: 5, // 液位 < 5%
message: "🔴 液位过低: {level}% - 请立即补充原料!",
suppressSeconds: 600 // 10 分钟内不重复
},
// === 趋势告警 ===
rapid_drop: {
enabled: true,
level: "critical",
threshold: 15, // 两次唤醒间液位下降 > 15%
message: "🔴 液位快速下降: 从 {from}% 降至 {to}% - 可能泄漏!",
suppressSeconds: 1800 // 30 分钟内不重复
},
// === 设备告警 ===
sensor_failure: {
enabled: true,
level: "warning",
threshold: 1, // 1 次失败即告警
message: "⚠️ 传感器故障: 距离测量超时 (容器 {container})",
suppressSeconds: 3600 // 1 小时内不重复
},
device_offline: {
enabled: true,
level: "critical",
threshold: 7200, // 超过 2 小时未唤醒
message: "🔴 设备离线: 已 {hours} 小时未唤醒 (上次: {lastWake})",
suppressSeconds: 7200 // 2 小时内不重复
},
// === 投料异常告警 ===
dosing_anomaly: {
enabled: true,
level: "warning",
threshold: 1,
message: "⚠️ 投料异常: 投料后液位未上升 (容器 {container})",
suppressSeconds: 3600
}
};
// 存储到 Flow Context
if (flow.get("alertRules") === undefined) {
flow.set("alertRules", alertRules);
}
// Function: 告警引擎
// 每次收到传感器数据时检查所有告警规则
var sensorData = msg.payload || flow.get("lastSensorData");
var rules = flow.get("alertRules");
var alerts = [];
// ===== 液位告警检查 =====
var levelPercent = sensorData.level || 0;
if (levelPercent >= 0) {
// 低液位告警
if (rules.low_level.enabled && levelPercent < rules.low_level.threshold) {
alerts.push(checkAlert("low_level", {
level: levelPercent,
container: "A"
}));
}
// 严重低液位告警
if (rules.critical_level.enabled && levelPercent < rules.critical_level.threshold) {
alerts.push(checkAlert("critical_level", {
level: levelPercent,
container: "A"
}));
}
}
// ===== 趋势告警检查 =====
var prevLevel = flow.get("previousLevel");
if (prevLevel !== undefined && levelPercent >= 0) {
var drop = prevLevel - levelPercent;
if (rules.rapid_drop.enabled && drop >= rules.rapid_drop.threshold) {
alerts.push(checkAlert("rapid_drop", {
from: prevLevel,
to: levelPercent,
container: "A"
}));
}
}
flow.set("previousLevel", levelPercent);
// ===== 传感器故障检查 =====
if (sensorData.error === "sensor_failure" && rules.sensor_failure.enabled) {
alerts.push(checkAlert("sensor_failure", {
container: "A"
}));
}
// ===== 处理告警抑制 =====
function checkAlert(ruleName, params) {
var rule = rules[ruleName];
var suppressKey = "suppress_" + ruleName;
var lastAlert = flow.get(suppressKey) || 0;
var now = Date.now();
// 检查抑制期
if (now - lastAlert < rule.suppressSeconds * 1000) {
node.log("Alert suppressed: " + ruleName + " (in suppress period)");
return null;
}
// 更新抑制时间
flow.set(suppressKey, now);
// 返回告警消息
var message = rule.message;
for (var key in params) {
message = message.replace("{" + key + "}", params[key]);
}
return {
rule: ruleName,
level: rule.level,
message: message,
params: params,
timestamp: now
};
}
// ===== 处理触发告警 =====
var activeAlerts = alerts.filter(function(a) { return a !== null; });
if (activeAlerts.length > 0) {
// 更新告警计数
var alertCounters = flow.get("alertCounters") || {};
activeAlerts.forEach(function(a) {
alertCounters[a.rule] = (alertCounters[a.rule] || 0) + 1;
});
flow.set("alertCounters", alertCounters);
// 输出告警
msg.alerts = activeAlerts;
return [msg, activeAlerts]; // 使用多输出: 0=正常, 1=告警
}
return [msg, null];
// Function: 发送 Telegram 告警通知
// 接收警报引擎的输出
var alerts = msg.payload;
if (!alerts || alerts.length === 0) return null;
var chatId = flow.get("telegramChatId") || "987654321";
// 构建通知消息
var message = "🚨 *自动投料系统告警*\n\n";
alerts.forEach(function(alert) {
var emoji = alert.level === "critical" ? "🔴" : "⚠️";
message += emoji + " *[" + alert.level.toUpperCase() + "]* ";
message += alert.message + "\n\n";
});
// 添加系统状态
message += "---\n";
message += "📊 当前液位: " + (flow.get("lastLevel") || "-") + "%\n";
message += "🕐 时间: " + new Date().toLocaleString();
// 配置 Telegram Sender 输出
msg.payload = {
chatId: chatId,
type: "message",
content: message,
parse_mode: "Markdown"
};
return msg;
// Function: 告警升级逻辑
// 如果同一告警在 1 小时内触发超过 3 次,升级通知级别
var counters = flow.get("alertCounters") || {};
var escalation = flow.get("alertEscalation") || {};
var escalatedAlerts = [];
var now = Date.now();
for (var rule in counters) {
if (counters[rule] >= 3) {
// 检查是否已经升级过
var lastEscalation = escalation[rule] || 0;
if (now - lastEscalation > 3600000) { // 1 小时内只升级一次
escalatedAlerts.push({
rule: rule,
originalCount: counters[rule],
message: "🔴 [升级] 告警 \"" + rule + "\" 已触发 " +
counters[rule] + " 次,请立即关注!",
timestamp: now
});
escalation[rule] = now;
}
}
}
flow.set("alertEscalation", escalation);
if (escalatedAlerts.length > 0) {
// 升级通知发送给更高级别 (如经理)
msg.payload = escalatedAlerts;
msg.channel = "manager"; // 区分普通通知和升级通知
return msg;
}
return null; // 无需升级
Terminal window
# 1. 测试低液位告警
mosquitto_pub -t "esp32/dosing/info" \
-m '{"distance":22,"level":15,"boot":6}'
# 预期: ⚠️ 液位偏低: 15% (容器 A)
# 2. 测试严重低液位告警
mosquitto_pub -t "esp32/dosing/info" \
-m '{"distance":23,"level":3,"boot":7}'
# 预期: 🔴 液位过低: 3% - 请立即补充原料!
# 3. 测试传感器故障
mosquitto_pub -t "esp32/dosing/status" \
-m '{"state":"error","error":"sensor_failure"}'
# 预期: ⚠️ 传感器故障: 距离测量超时
# 4. 测试告警抑制
# 连续发送 3 次低液位告警
# 第 1 次: 发送通知
# 第 2,3 次 (5 分钟内): 抑制,不发送
# 5 分钟后: 可再次发送
# 5. 验证 Telegram 接收
# 手机端应收到格式化的告警消息
// Function: 按时间过滤通知
var hour = new Date().getHours();
if (hour >= 22 || hour < 7) {
// 夜间: 仅严重告警 (critical) 才通知
// warning 级别静默
if (msg.payload.level !== "critical") {
return null; // 丢弃
}
}
return msg;
  1. 合理设置抑制期(同一告警短时间内不重复)
  2. 使用告警级别区分(Info/Warning/Critical)
  3. 配置告警聚合(同一设备的多条告警合并为一条)
  4. 设置静默时段(如夜间仅 Critical 告警通知)
// Function: 记录告警到 MariaDB
msg.topic = "INSERT INTO alert_history " +
"(rule, level, message, timestamp) VALUES (" +
"'" + alert.rule + "', " +
"'" + alert.level + "', " +
"'" + alert.message.replace(/'/g, "''") + "', " +
Math.floor(Date.now() / 1000) + ")";
return msg;

推荐做法:

  • 设置告警抑制期避免消息轰炸
  • 使用告警级别分级 (Info/Warning/Critical)
  • 关键告警 (Critical) 不过滤,确保送达
  • 告警信息包含上下文(时间、液位、设备)
  • 告警引擎独立于核心逻辑运行

避免做法:

  • 所有告警使用相同级别和频率
  • 忽略告警抑制导致 Telegram 消息刷屏
  • 告警消息不含设备/容器标识
  • 告警引擎阻塞主业务流程
  1. 告警规则: 低液位/快速下降/传感器故障/设备离线
  2. 告警引擎: 阈值检查 → 抑制 → 通知
  3. 抑制机制: 同类型告警 5-30 分钟不重复
  4. 升级逻辑: 频繁触发的告警自动升级通知级别
  5. 通知渠道: Telegram 即时推送 + Email 正式通知