MQTT学习使用

1、长连接

1.1、为什么用MQTT

1.1.1、ws方案弊端

  1. wss长连接客户端与服务pod硬耦合,使得服务必须脱离于k8s体系,稳定性难以保障。
  2. 服务本地内存 存有用户链接信息,pk上下文信息,导致服务为有状态服务。不能正常无损发版更新。
  3. 客户端之间的信息互通,链路长,依赖的组件和数据结构多,通信质量难以保证。
  4. wss链接的管理比较粗糙,消息通信机制原始,强依赖tcp可靠性,没有额外保障,丢消息概率大。
  5. 由于wss链接是每个user专属一条。并发情况下pod需要维持大量长连接,但是这些长连接,通信频率低、吞吐量低,造成k8s node资源的极大浪费。

1.1.2、mqtt方案优势

  1. 发布订阅架构,业务仅仅需要关注由topic定义的路由规则,而不需要关注底层链接的管理。
  2. 有成熟的长连接保活、探活机制,把客户端的断连、上线,封装成完善的系统事件,供业务使用。
  3. 有完善的消息保障机制,来保障消息的可靠性传输,详情见下文阐述。
  4. 将客户端链接与服务端pod解耦,使得服务端可以像其他无状态服务一样,重启、发版、HPA。
  5. 通过一定的信道设计和消息处理机制设计,能实现高效的通信,单条链接至少1w条/s的吞吐量,且可对每条消息维度进行负载均衡。

1.2、MQTT长连接方案

1.2.1、mqtt信道设计

客户端->服务端

1
$share/group_oral/oral/english/pk/server/+

服务器端的共享订阅+qos 1的可靠性传输+0会话保存机制。可做到如下可靠性保障:

  1. 从客户端到服务端的消息发送成功后至少到达一次。保障消息不丢失。
  2. 从客户端到服务端的每条消息都会进行负载均衡。保障服务器的负载均衡。
  3. 服务端某一链接异常断连后,客户端消息可以迅速投递的同组下一个链接中。

服务端->客户端

1
"oral/english/pk/client/"+pkID

客户端pkid订阅+qos 1的可靠性传输+离线会话保持机制,可做到如下可靠性保障:

  1. 非极端case从服务端到客户端的消息至少到达一次。极端case:弱网离线状态下,重连不超过会话保持时间保障消息不丢。
  2. 服务端到pk的用户客户端的消息保持发送的顺序。不会因为极端网络环境导致消息失序。

1.2.2、服务端消息处理设计

在上述阐述中,我们借助mqtt服务器实现了,消息的可靠性传输的信道。但是信道的吞吐量和延时仍需特殊设计。单纯顺序性的消费保障不了吞吐量与低延时。

服务端快速ack+多协程并发处理响应

  1. 如信道设计图,服务器的链接虽然变少了,但是稍有不慎就会造成读写的线头阻塞,导致总体服务质量下降。
  2. 牺牲处理消息的顺序性,保障大并发的入流量,客户端低等待延时。

1.3、服务端长连接保障

1.3.1、服务平滑退出

http服务pod的平滑退出,由成熟的负载均衡组件(ingress或SLB)来做上线和摘除。而大部分tcp长连接服务,是需要自己HOOK退出事件来做到平滑退出。

本次口算服务mqtt的长连接,是监听了进程退出信号,来自主摘除topic订阅,同时配合k8s关闭前timesleep,来保障无损退出。

1.3.2、长连接异常断开

见信道设计中服务端的介绍,我们通共享订阅+0会话可以做到,异常断连后迅速负载至同组内其他链接。保障断连消息不丢。

同时设置自动重连机制,来保障mqtt客户端异常断连后各个pod之间发生长时间的负载不均情况。

1.3.3、长连接延时

对于mqtt方案的长连接的延时,有做过相应的测试。测试条件:公网本机模拟服务端和客户端 1000 、1w条消息发送等待到达并计算延时,三次实验。

1
消息体 fmt.Sprintf(`{"user_id":"%d","content":"hello"}`, i)

1000 发送接收延时:

第一次 第二次 第三次
p50: 29.385msp90: 39.642msp99: 50.374ms p50: 24.436msp90: 34.08msp99: 59.66ms p50: 29.162msp90: 38.805msp99: 46.829ms

1w 发送接收延时:

第一次 第二次 第三次
p50: 26.729msp90: 35.759msp99: 44.011ms p50: 27.974msp90: 37.357msp99: 45.933ms p50: 27.225msp90: 35.739msp99: 43.872ms

1.3.4、长连接吞吐量

对于mqtt方案的长连接的延时,有做过相应的测试。测试条件:公网本机模拟服务端和客户端 1w、10w条消息发送三次并确认到达,仅计算发送耗时。

1
消息体 fmt.Sprintf(`{"user_id":"%d","content":"hello"}`, i)

1w: 发送总耗时 456.590209ms 586.06925ms 635.62ms

10w:发送总耗时 4.681789666s 4.062220334s 5.029271834s

总结公司wifi下外网发送吞吐量约2w条/s

2、 mqtt概述

MQTT (Message Queue Telemetry Transport) 是一个轻量级传输协议,它被设计用于轻量级的发布/订阅式消息传输,MQTT协议针对低带宽网络,低计算能力的设备,做了特殊的优化。是一种简单、稳定、开放、轻量级易于实现的消息协议,在物联网的应用下的信息采集,工业控制,智能家居等方面具有广泛的适用性。

  • MQTT更加简单:MQTT是一种消息队列协议,使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合,相对于其他协议,开发更简单;
  • MQTT网络更加稳定:工作在TCP协议上;由TCP协议提供稳定的网络连接;
  • 轻量级:小型传输,开销很小,协议交换最小化,以降低网络流量;适合低带宽,数据量较小的应用。

MQTT支持三种消息发布服务质量(QoS)

  • “至多一次”(QoS==0):消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
  • “至少一次”(QoS==1):确保消息到达,但消息重复可能会发生。
  • “只有一次”(QoS==2):确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

MQTT协议三种身份

发布者、代理、订阅者,发布者和订阅者都为客户端,代理为服务器,同时消息的发布者也可以是订阅者(为了节约内存和流量发布者和订阅者一般都会定义在一起)。

MQTT传输的消息分为主题(Topic,可理解为消息的类型,订阅者订阅后,就会收到该主题的消息内容(payload))和负载(payload,可以理解为消息的内容)两部分。

mqtt与websocket的区别

相同点:

  • MQTT 和 WebSocket 都是应用层协议
  • 目前底层都是使用 TCP 协议确保可靠传输数据
  • 都规定了自己的报文(消息)结构
  • 都支持双向通信
  • 都使用二进制编码(有别于 HTTP 这一类基于文本编码的协议)

不同点:

websocket mqtt
通信模型不同 webSocket 是一种简单的报文协议,着重解决浏览器和服务端不能进行双向通信的问题。WebSocket 仅仅定义了会话的发起方式和报文格式及类型。如何使用报文通信全由应用程序(各浏览器)控制。 MQTT 则是一种比较复杂的消息协议。MQTT 不仅规定了具体的协议编码,还规定了客户端和服务器的通信模型。具体来说就是MQTT是一种面向主题(topic)的消息广播协议。客户端可以创建、加入和订阅任意主题,并向主题发布消息或者接收广播消息。除此之外,MQTT 还规定了消息的投放级别(QoS),支持至少一次、至多一次和精确投递三种级别,在协议层规定了是否会产生重复投递。
报文结构不同 WebSocket 报文相对简单 mqtt相对复杂
消息收发方式不同 WebSocket 收发消息不需要对方确认。因为底层的 TCP 协议会完成可靠传输 MQTT 收发消息需要根据投递级别进行确认

2.1 mqtt底层原理

mqtt协议底层方法

CONNECT:客户端连接到服务器

CONNACK:连接确认

PUBLISH:发布消息

PUBACK:发布确认

PUBREC:发布的消息已接收

PUBREL:发布的消息已释放

PUBCOMP:发布完成

SUBSCRIBE:订阅请求

SUBACK:订阅确认

UNSUBSCRIBE:取消订阅

UNSUBACK:取消订阅确认

PINGREQ:客户端发送心跳

PINGRESP:服务端心跳响应

DISCONNECT:断开连接

AUTH:认证

mqtt协议数据格式

在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体 (payload)三部分构成。MQTT数据包结构如下:

固定头

固定头存在于所有MQTT数据包中, 固定头包含两部分内容,首字节(字节1)和剩余消息报文长度(从第二个字 节开始,长度为1-4字节),剩余长度是当前包中剩余内容长度的字节数,包括变量头和有效负载中的数据)

数据包类型

标识位

首字节的低4位(bit3~bit0)用来表示某些报文类型的控制字段,实际上只有少数报文类型有控制位。

其中Bit[3]为DUP字段,如果该值为1,表明这个数据包是一条重复的消息;否则该数据包就是第一次发布的消息。

如果Bit 1和Bit 2都为0,表示QoS 0:至多一次;如果Bit 1为1,表示QoS 1:至少一次;如果Bit 2为1,表示QoS 2:只有一次;如果同时将Bit 1和Bit 2都设置成1,那么客户端或服务器认为这是一条非法的消息,会关闭当前连接。

可变头

可变头的意思是可变化的消息头部。有些报文类型包含可变头部有些报文则不包含。可变头部在固定头部和消 息内容之间,其内容根据报文类型不同而不同。

2.1.1 不同投递级别的实现

qos=0

第一步:Publisher 将消息PUBLISH到Broker中去,发出去后自己将消息删除。

第二步:Broker接收到消息之后直接发往Subscriber,他们三者之间是不存在确认关系的,没有确认机制,消息收不收的到无所谓。完成一次服务的通信。

qos=1

第一步:Publisher 将消息 Store(存储)一份在自己这里,然后PUBLISH到Broker中去。

第二步:Broker存储一份消息之后就将信息PUBLISH到Subscriber中,然后给Publisher一个PUBACK的确认,确认已经发布到Subscrliber中,Publisher才将自己的信息删除。

第三步:Subscrliber收到信息之后,会给Broker发送一个PUBACK确认,确认信息已收到,Broker拿到确认之后删除自身的信息。完成一次服务的通信。

qos=2

第一步:Publisher先存储一份消息,再将消息PUBLISH到Broker中。

第二步:Broker得到消息之后先存一份消息,然后PUBREC告诉Publisher我已经收到消息,Publisher接收到请求之后给Broker回一个PUBREL可以发送信息到Subscrliber了,然后Broker就把信息PUBLISH到Subscrliber上,接着就给Publisher回一个PUBCOMP表示消息已发送完成,Publisher接收到这个请求之后就将自身存储的信息删除。

第三步:Subscrliber接收到信息之后先Store存储信息,存储完成之后向Broker发送一个PUBREC表示消息已经收到,Broker接收到请求之后回应一个PUBREL表示消息可以释放,也就是说Subscrliber可以用此消息去处理业务请求了,Subscrliber将消息Notify(通知)上层应用进行业务处理,业务处理完成Subscrliber会给Broker回复一个PUBCOMP表示业务处理已经完成,Broker就可以把信息删除,接着Subscrliber也把自身信息删除,完成一次服务的通信。

2.1.2 mqtt常用的消息类型

topic说明:

MQTT 主题名称是用于消息路由的 UTF-8 编码字符串。为了提供更大的灵活性,MQTT 支持分层主题命名空间。主题通常按层级分级,并使用斜杠 / 在级别之间进行分隔。在使用时候不需要提前创建topic。

保留消息:

发布者发布消息时,如果 Retained 标记被设置为 true,则该消息即是 MQTT 中的保留消息(Retained Message)。MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。(如果设置为false,新建立连接的客户端无法收到最近的一条消息)

通配符订阅:

单层通配符:+ 是一个通配符字符,仅匹配一个主题层级

1
2
3
4
"+" 有效
"sensor/+" 有效
"sensor/+/temperature" 有效
"sensor+" 无效 (没有占据整个层级)

多层通配符:# 匹配主题中的任意层级。多层通配符表示它的父级和任意数量的子层级。

1
2
3
4
"#" 有效,匹配所有主题
"sensor/#" 有效
"sensor/bedroom#" 无效 (没有占据整个层级)
"sensor/#/temperature" 无效 (不是主题最后一个字符)

延迟消息

(emqx 拥有的扩展功能)延迟发布是 EMQX 支持的 MQTT 扩展功能。当客户端使用特殊主题前缀$delayed/{DelayInteval} 发布消息时,将触发延迟发布功能,可以实现按照用户配置的时间间隔延迟发布消息。 (仅支持秒级)

1
$delayed/{DelayInterval}/{TopicName}

2.2 emqx

mqtt本身是一种网络协议,有多种实现此协议的中间件如(emqx、mosquitto)本次主要讲emqx。

2.2.1 负载均衡

当在 EMQX 中部署 LB (负载均衡器) 后,LB 会负责处理 TCP 连接,并将收到的 MQTT 连接与消息分发到不同的 EMQX 集群节点。

2.2.2 EMQX 分布式集群设计

EMQX 分布式集群的基本功能是转发和发布消息到订阅者

emqx需要维护订阅表、路由表、主题树

订阅表

用于存储 主题->订阅者 之间的映射关系,从而确保能将传入消息正确路由到对应的客户端。该数据只存在于订阅者所在的 EMQX 节点上,类似的结构如下:

1
2
3
4
5
node1:
topic1 -> client1, client2
topic2 -> client3
node2:
topic1 -> client4

路由表:

路由表记录了主题->节点之间的映射,它存储每个节点上客户端订阅的主题列表,并用于将消息路由到对应的节点。该数据会在同一集群中的所有节点复制一份。

1
2
3
topic1 -> node1, node2
topic2 -> node3
topic3 -> node2, node4

主题树:

主题树是一种分层数据结构,它存储有关主题层次结构的信息,并用于消息与订阅客户端的匹配。主题树会在同一集群中的所有节点复制一份。

Client Node Subscribed topic
client1 node1 t/+/x, t/+/y
client2 node2 t/#
client3 node3 t/+/x, t/a

当一个 MQTT 客户端发布消息时,它所在的节点会查找路由表,并根据消息主题将消息转发到对应的节点(可能是多个节点)。

然后,接收到消息的节点会查找本地订阅表,并将消息发送至对应的订阅者。

例如,当客户端 1 发布一条消息到主题 t/a 时,消息在节点之间的路由和分发如下:

  1. 客户端 1节点 1 发布一条主题为 t/a 的消息;
  2. 节点 1 查询主题树,了解到 t/a 与现有主题 t/at/# 相匹配。
  3. 节点 1 查询路由表,并得知:
    1. 节点 2 上有客户端订阅了 t/# 主题;
    2. 节点 3 上有客户端订阅了 t/a 主题;因此节点 1 会将消息同时转发给节点 2节点 3
  4. 节点 2 收到转发的 t/a 消息后,通过查询本地订阅表,将消息分发给订阅了 t/# 的客户端。
  5. 节点 3 收到转发的 t/a 消息后,通过查询本地订阅表,将消息分发给订阅了 t/a 的客户端。
  6. 消息发布完成。

2.2.3 emqx5.0

MQX 节点之间的连接模式从 Mnesia 的全网状拓扑结构转向 Mria 的网状+星型状拓扑结构,集群中节点可以按角色分为核心节点(Core)或复制者节点(Replicant)。

核心节点

核心节点作为数据库的数据层,节点间以全网状连接,每个节点都包含一个最新的数据副本,这保证了容错性:只要有一个节点存活,数据就不会丢失。核心节点一般是静态和持久的,不建议进行自动伸缩(即经常添加、删除或替换节点)。

复制节点

复制节点会连接到核心节点,并被动地复制来自 核心节点的数据更新。复制节点不允许执行任何的写操作,而是将其转交给核心节点代为执行。同时,由于复制节点有一个完整的本地数据副本,因此数据读取速度非常快,这样有助于降低 EMQX 路由的时延。

问题解决

以上已经解决了前两个问题,第三个问题。在emqx中提供了建立连接、订阅,取消订阅、超时重连,探活等一系列功能。通过相应的配置(keepalive、pingtimeout、cleansession)等参数即可实现。(后端可以在服务启动时建立连接、前端则根据具体场景选择合适的时机建立连接并订阅)。


MQTT学习使用
https://zty-f.github.io/2025/01/20/MQTT学习使用/
作者
ZTY
发布于
2025年1月20日
更新于
2025年3月6日
许可协议