跳转到内容

发布订阅架构

本节深入介绍 MQTT 的发布-订阅(Publish-Subscribe)架构模式。学习完成后,您将能够:

  • 理解发布-订阅模式的原理和优势
  • 区分发布者、代理服务器和订阅者的角色
  • 对比发布-订阅模式与请求-响应模式的区别
  • 解释 MQTT 架构如何支持 IoT 场景

在开始本节之前,请确保:

  • 已完成 MQTT 协议概述
  • 了解基本的网络通信模式
  • Mosquitto Broker 已安装运行

MQTT 采用发布-订阅(Pub-Sub)消息传递模式,与传统的客户端-服务器(请求-响应)模式有本质区别:

传统请求-响应模式(如 HTTP)

Client ──── Request ────→ Server
Client ←─── Response ──── Server

MQTT 发布-订阅模式

Publisher ──── Publish ────→ Broker ──── Deliver ────→ Subscriber
多个 Publisher 可以发布
多个 Subscriber 可以订阅
特性请求-响应 (HTTP)发布-订阅 (MQTT)
通信模式点对点一对多
耦合度强耦合(客户端需知道服务器地址)松耦合(通过 Broker 中转)
时序依赖请求必须等待响应发布者可在没有订阅者时发布
扩展性每个客户端需要独立连接一个发布可服务多个订阅者
实时性轮询或长连接推送模式,实时性更好

Publisher(发布者): 发布者是数据的产生方,负责将消息发送到 Broker 的特定 Topic 上。

特点:

  • 不知道消息会被哪些订阅者接收
  • 不需要等待订阅者的确认
  • 可以根据需要随时发布消息
  • 可以同时发布到多个 Topic

Broker(代理服务器): Broker 是 MQTT 架构的核心组件,负责消息的路由和分发。

职责:

  • 接收来自 Publisher 的消息
  • 根据 Topic 将消息路由给所有订阅者
  • 维护客户端连接状态(在线/离线)
  • 管理会话状态和消息队列

Subscriber(订阅者): 订阅者是数据的消费方,通过订阅 Topic 来接收感兴趣的消息。

特点:

  • 不知道消息来自哪个发布者
  • 可以同时订阅多个 Topic
  • 可以在任何时候订阅或取消订阅
  • 不需要时也可以不订阅任何 Topic
┌──────────────────────────────────────────────────────────┐
│ MQTT 通信流程 │
├──────────────────────────────────────────────────────────┤
│ │
│ ESP32 (Publisher) Broker Node-RED│
│ ┌──────────┐ ┌────────┐ ┌──────────┐ │
│ │ 温度传感器 │ │ │ │ Dashboard│ │
│ │ 发布 26.5°C│─────────→│ Mosquitto│─────────→│ 显示数据 │ │
│ │ topic: │ │ │ │ │ │
│ │ temp/data │ │ │ │ InfluxDB │ │
│ └──────────┘ │ │ │ 存储数据 │ │
│ │ │ └──────────┘ │
│ Shelly (Publisher) │ │ │
│ ┌──────────┐ │ │ Phone (Sub) │
│ │ 功耗数据 │─────────→│ │─────────→│ 接收告警 │ │
│ └──────────┘ └────────┘ └──────────┘ │
│ │
└──────────────────────────────────────────────────────────┘
Terminal window
# 终端 1: 发布者 (ESP32 模拟温度数据)
mosquitto_pub -h localhost -t "factory/zone1/temperature" \
-m '{"value": 26.5, "unit": "celsius"}' -d
# 终端 2: 订阅者 A (Node-RED 数据处理)
mosquitto_sub -h localhost -t "factory/zone1/temperature" -v
# 终端 3: 订阅者 B (数据存储)
mosquitto_sub -h localhost -t "factory/zone1/temperature" -v
# 终端 4: 订阅者 C (实时仪表板)
mosquitto_sub -h localhost -t "factory/zone1/temperature" -v

预期结果:当终端 1 发布消息时,终端 2、3、4 同时收到相同的消息内容。

Terminal window
# 多个传感器发布到不同 Topic
# 传感器 1: 温度
mosquitto_pub -h localhost -t "sensors/temperature" -m "26.5"
# 传感器 2: 湿度
mosquitto_pub -h localhost -t "sensors/humidity" -m "62"
# 传感器 3: 光照
mosquitto_pub -h localhost -t "sensors/light" -m "450"
# 订阅者使用通配符接收所有传感器数据
mosquitto_sub -h localhost -t "sensors/#" -v

发布者和订阅者在时间、空间和同步上完全解耦:

  • 空间解耦:双方不需要知道对方的网络地址
  • 时间解耦:发布者不需要订阅者在线即可发布消息
  • 同步解耦:发布和订阅都是异步操作

业务价值

✅ 优势:
- 新增数据消费者无需修改发布者代码
- 发布者离线不影响订阅者
- 系统扩展灵活
❌ 传统方案问题:
- 新增监控终端需修改所有传感器代码
- 传感器离线导致监控系统报错
- 扩展维护成本高
小型部署:
[ESP32] ──→ [Broker] ──→ [Dashboard]
1-10 个设备,单台服务器
中型部署:
[ESP32] ──→ [Broker] ──→ [Dashboard]
[ESP32] ──→ [Storage]
[ESP32] ──→ [Mobile App]
10-100 个设备,单台服务器
大型部署:
[ESP32群] ──→ [Broker 集群] ──→ [Dashboard]
[Shelly群] ──→ [Storage]
[传感器群] ──→ [AI 分析]
[Mobile App]
100-10,000+ 设备,集群部署

动态订阅:订阅者可以在运行时随时增删订阅

// Node-RED 中动态管理订阅
// 运行时添加新 Topic 的订阅
broker.subscribe("newly/added/topic");
// 运行时取消不需要的订阅
broker.unsubscribe("old/unused/topic");

多协议桥接:Broker 可以将 MQTT 桥接到其他系统

MQTT Broker
├──→ MQTT → IoT 设备
├──→ HTTP → Web 应用
├──→ WebSocket → 浏览器
└──→ MQTT Bridge → 云平台
设备定时发送 HTTP 请求检查是否有新数据:
时间线:
T0: 客户端 → HTTP GET → 服务器 (无新数据)
T1: 客户端 → HTTP GET → 服务器 (无新数据)
T2: 客户端 → HTTP GET → 服务器 (有新数据)
T3: 客户端 → HTTP GET → 服务器 (无新数据)
问题:
- 大量无效请求浪费带宽
- 数据更新延迟取决于轮询间隔
- 服务器负载随客户端数量线性增长
设备通过持久连接实时接收推送:
时间线:
T0: 客户端建立 MQTT 连接(仅一次)
T1: 无数据 → 无网络流量
T2: 服务器推送新数据
T3: 无数据 → 无网络流量
优势:
- 零无效请求
- 毫秒级实时推送
- 服务器负载与数据量相关,不与客户端数量线性增长

Q: 为什么需要一个中央 Broker?是不是单点故障?

A: Broker 确实是一个集中点,但这不是架构缺陷。实际上:

  • MQTT Broker 支持高可用集群部署
  • 单个 Mosquitto 实例可稳定运行数月
  • Broker 故障时,设备数据暂存在本地缓存,恢复后自动续传
  • EMQX 等商业版支持热备切换

Q: 一对多通信在实际业务中有什么用?

A: 典型场景:一个温度传感器的数据可以同时用于:

  1. 实时仪表板展示(运营部门)
  2. 数据库存储(数据分析部门)
  3. 告警通知(维护部门)
  4. 自动控制(空调系统) 无需为每个用途都独立部署传感器。

本节要点总结:

  1. 发布-订阅模式:通过 Broker 解耦发布者和订阅者,实现灵活的一对多通信
  2. 核心优势:解耦、可扩展、灵活,适合 IoT 场景
  3. 与 HTTP 对比:MQTT 推送模式比 HTTP 轮询更高效、实时性更好
  4. 业务价值:减少无效网络流量,降低服务器负载,支持动态扩展