spring 本身通过 integration message 的方式支持了 mqtt 的收发,目前还没有让我满意的 spring-boot-starter,那么就自己写一个吧。

在开始之前,如果不了解 spring-integration 可以参考:spring integration 入门知识 - Some Site (kicey.site)

整个 starter 建立在 spring-integration-mqtt 之上,相关内容:spring-integration-mqtt 概览(源码简析) (kicey.site)

整体设计

mqtt 特性

在阐明设计之前需要阐述一些 mqtt 的特性:

  • client 只能有一个实例(同一个 client_id)连接到中间件上,也不能多 TCP 连接
  • mqtt 消息的路由仅依赖 topic (允许通配),不存在负载均衡等复杂情况,只存在一个队列订阅的 topic 是否与发送方的 topic 匹配(可以一对多的形式)
  • 一个 client 可以声明一个遗嘱,在下线的时候这个遗嘱(类似一条消息,存在相应的 topic)被中间件发送
  • 一个 client 可以订阅多个 topic
  • 消息的格式没有严格的限制可以是一个字符串,也可以是图片的二进制数据

(题外话:这里可以看到 mqtt 依赖于中间件,并不是纯粹的端到端协议)

目标

  • 通过注解,声明式地定义一个 client 中收到的某个 topic 的消息的处理
  • 通过注解,将一个 client 中的方法的返回值作为消息发送到特定的 topic

以上即是全部

部分设计解释

  • @MqttClient 作为一个类注解,而 @SubscribeTopic,@PublishTopic 作为方法注解
  • mqtt 在大多数的使用场景下需要提前协商定义 topic,这也是我们预定义 topic 的依据
  • 因为一个 client 严格和一个 tcp/websocket 连接对应,无需维持连接池
  • 多个 client 对应多个类,那么为了尽量的使配置集中,使用注解的属性作为 client 属性,而不是通过配置文件,而中间件的共享配置使用配置文件
  • 该 spring-boot-starter 依赖于 spring-integration 和 Paho 注解需保留到运行时

部分实现解释

  • 将 Paho 层面的 client 暴露给容器,在初始化之后向容器注册,这的确是向用户展示了不该暴露的细节,一旦用户直接调用该 client,处理逻辑多半会有问题,但我相信这是值得的,用户有知晓当前层面所能达到的底层的自由
  • 注解及其上属性的的利用基于 BeanFactoryPostProcessor 与反射,进行消息组件和 Paho client 的装配
  • 利用 spring-integration-mqtt 的管道适配器和参数校验器,以及反序列化
  • spring-integration 对 client 基本没有管理,每次获取即创建一个新的 client,但定义了相关的接口,需要自己实现一个 client 复用的 factory,目前使用 HashMap 作为一个简易的缓存

仓库

GitHub - Kicey/spring-boot-starter-mqtt
Contribute to Kicey/spring-boot-starter-mqtt development by creating an account on GitHub.

题外话

写这个 starter 的过程中遇到一点 spring 注解支持方面的小问题,最后变成了对 spring-integration 项目的一次 PR,意外之喜。很高兴,但毕竟还是小事的范畴,继续加油。