MQTT(Message Queuing Telemetry Transport 消息队列遥测传输协议) 是一种基于发布/订阅(publish/subscribe)模式的 “轻量级” 通讯协议,该协议构建于 TCP/IP 协议上,最早由 IBM 在 1999 年发布。 目前由 OASIS 维护 MQTT 标准

OASIS 这个组织维护的标准大部分人应该都没听说过

不过 AMQP — Advanced Message Queuing Protocol 大概不用多说 。。。大名鼎鼎的 RabbitMQ 就是实现的这个协议

MQTT 最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分

由于物联网的环境是非常特别的,所以MQTT遵循以下设计原则:

  1. 精简,不添加可有可无的功能;
  2. 发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递;
  3. 允许用户动态创建主题,零运维成本;
  4. 把传输量降到最低以提高传输效率;
  5. 把低带宽、高延迟、不稳定的网络等因素考虑在内;
  6. 支持连续的会话控制;
  7. 理解客户端计算能力可能很低;
  8. 提供服务质量管理;
  9. 假设数据不可知,不强求传输数据的类型与格式,保持灵活性。

ClientID

这个必须是唯一的。如果没有设置就会随机生成一个

如果有 ClientID 相同的就会把之前的挤掉

Last Will (遗愿消息)

MQTT 客户端向服务器端 CONNECT 请求时,可以设置是否发送遗愿消息(Will Message)标志,和遗愿消息主题(Topic)与内容(Payload)。

注:

遗愿消息只有 CONNECT 才能设置

MQTT 客户端异常下线时(客户端断开前未向服务器发送 DISCONNECT 消息),MQTT 消息服务器会发布遗愿消息。

如果你想区分连接状态(网络错误和主动关闭)的话 应该自定义一个状态的 topic 配合 Last Will 来使用

Retain(驻留)

MQTT 客户端向服务器发布(PUBLISH)消息时,可以设置驻留消息(Retained Message)标志。 驻留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。

同一 Topic 下的 Retain 消息有且只能有一条,后来的驻留消息会取代前一条驻留消息

QoS (质量控制)

qos 描述
0 无任何保证
1 一定会到达
2 一定会到达并且只会到达一次

MQTT 发布消息 QoS 保证不是全部的,是客户端与 Broker 之间的

QOS 是针对与和 Broker 通信,因此 publish 了一条消息并不能保证 subscriber 一定会收到,比如:发消息时订阅还没有完成

mqttv5 新特性:请求响应 可以解决这个问题

不过 mqttv5 这也有问题。。我有要是多个订阅者那(按照你业务具体需求来实现吧,返回个 ${ClientID} ACK 之类的)

Topic (主题:消息路由)

主题 (Topic) 通过 / 分割层级,支持 +, # 通配符:

/aa 是两个完全不同的 topic

+: 表示通配一个层级,例如 a/+,匹配 a/x, a/y

#: 表示通配多个层级,例如 a/#,匹配 a/x, a/b/c/d

可以这样写:

chat/room/1

chat/#

sensor/10/temperature

sensor/+/temperature

注:

订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息。

mqtt 5 vs mqtt 3.1.1

目前有这两个主要的版本。。mqttv5 的改变巨大

网络波动引起的丢消息问题

mqttv3.1.1 CleanSession

网络波动引起的 Subscribe 状态的丢失

RabbitMQ 收到消息回执(Message acknowledgment)

这点和 RabbitMQ 不同(抱歉,我没有实际使用过 RabbitMQ。如果这个地方说错了,请务必指出)

Reconnect 之后虽然恢复。但 DisconnectReconnect 之间的信息会丢失

用这个选项是可以解决的 CleanSession 的默认设置是开启的,要关掉这个

还应该把检测掉线延时设置大一点,这样对抗网络波动的能力就更好些

不过这里还有其他问题。。。如果长时间没有恢复(这个看各 broker 是如何实现的) 不过 mqttv5 加入了session-expiry-interval 这个地方更完善了

eclipse Paho 项目

pāho (verb) to broadcast, make widely known, announce, disseminate, transmit (via the Maori dictionary)

paho 是毛利语中的广播的意思

这个项目里有几乎所有语言的 mqtt 库的实现(有钱真好)

Broker

Eclipse Mosquitto

可以把这个当作标准工具来开发,测试,和使用

Mosquitto 提个了非常好用的命令行工具可以用来开发调试

  • mosquitto_sub
  • mosquitto_pub
  • mosquitto_rr

直接用

# 开启起一个 mqtt broker,默认端口为 1883
mosquitto

# 订阅一个名为 test 的 topic
mosquitto_sub -L mqtt://localhost:1883/test

# 推送一条内容为 `233` 的消息到 test
mosquitto_pub -L mqtt://localhost:1883/test -m "233"

# 也可以这样订阅 topic 再推送消息
mosquitto_rr -L mqtt://localhost:1883/test -e test -m "233"

volantmq

用 golang 写的。看起来还不错,不过我没用过

emqx

这个是国人用 erlang 写的,还不错

MQTT Over Websocket

MQTT 协议的 WebSocket 连接,必须采用 binary 模式,并携带子协议 Header:

Sec-WebSocket-Protocol: mqttv5mqttv3.1.1

这个是有标准文档的:Using WebSocket as a network transport

jsonrpc 2.0 Over MQTT

这个好像是我原创的。不建议这样用,虽然我们在生产环境里用的这个。

有些异常情况的处理方案还不够完善。(极端恶劣的现实情况下表现比较骨感)

这个要把 Last Will 消息 和传输考虑进去。要依赖 qos 2 和 Disable CleanSession

topic 虽然是全双工的可以当 tcp 用。状态用另一个 topic 里提供

这里判断 rpc 消息是否到达是有问题的。(办法是有,不过我目前没有找到很优雅的方案)

另一种 mqtt rpc 的方案 (这个使用了 mqttv5 的特性)

mqtt rpc 这个仅供参考

事故和反思

Broker 挂了

曾经有同事从硬件里读数据然后就以这个速率发送给 broker 。。。然后整个 Broker 都被拖垮了

发送速率还是要限制一下。这样高频的数据的意义在哪?(通过云端处理来实时控制不适合使用 mqtt 。。DDS 协议请(这货甚至有 21 种 QoS ))

发送速率建议不要太高。每秒一条,差不多(频率那么高的意义在那?)

为了快速响应设备断网

如果要求快速响应有 keep alive 的(默认 60s ),可以设置很低。

不过个人建议不要设置在 3s 以下(网络延时和时间差(ntp服务)可能会导致问题)

快速响应 keep alive 10-15s 大概可以适配绝大部分情况了。

主动关闭时最好发送关闭的信息。

如果有人非要超级快的快速检测网络状态,你可以这样怼过去:你看 ssh 检测网络掉线也不是立刻能检测到

关于 MQTT 的客户端库:我觉得好多库都有各种各样的问题

MQTT.js

我每次都想吐嘈这个地方

这个库可是支持 aliswxs 协议的哟~

带你了解 ali 协议

带你了解 wx 协议

没看明白? 可以看这个中文的

eclipse/paho.mqtt.golang

这个库只支持 mqttv3.1.1 你敢信?

你在看实现 API

一堆 interface 接口???

eclipse/paho.golang

eclipse 是有 mqttv5 的库的

大概是 paho.mqtt.golang 实现过于魔幻。所以作者重新开了一个新坑(只是这个 API 看起来正常多了)

ruby-mqtt

njh/ruby-mqtt

这个不支持 QoS 2

类似 mqtt 特性支持不全的库还有很多。。这大概也是一大坑点


Reference:

MQTT: The Standard for IoT Messaging

Paho Wiki

Clean Start 与 Session Expiry Interval - MQTT 5.0 新特性

EMQ X

volantmq