spring-integration-mqtt 在 Paho 之上,利用 spring-integration 原本的消息机制实现 mqtt 消息的发送和接收。

包整体结构比较简单
重要的是后 3 个
inbound

重点是通过继承 AbstractEndpoint 实现了 doStart() 方法,在启动时调用 connectAndSubscribe()。
@Override
protected void doStart() {
Assert.state(getTaskScheduler() != null, "A 'taskScheduler' is required");
try {
connectAndSubscribe();
}
catch (Exception ex) {
logger.error(ex, "Exception while connecting and subscribing, retrying");
scheduleReconnect();
}
}
connectAndSubscribe() 如下,主要完成的工作:
- 从工厂获取 ConnecitonOptions
- 从工厂获取 MqttClient
- client 设置 ConnectOptions 创建于中间件的连接并设置订阅
- 将当前类作为 client 中的回调处理器(实现了 MqttCallback 接口)
- 如果启动失败,通过 ApplicationEventPublisher 发布这个失败事件
private synchronized void connectAndSubscribe() throws MqttException { // NOSONAR
MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
this.cleanSession = connectionOptions.isCleanSession();
this.consumerStopAction = this.clientFactory.getConsumerStopAction();
if (this.consumerStopAction == null) {
this.consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
}
Assert.state(getUrl() != null || connectionOptions.getServerURIs() != null,
"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
this.client = this.clientFactory.getClientInstance(getUrl(), getClientId());
this.client.setCallback(this);
if (this.client instanceof MqttClient) {
((MqttClient) this.client).setTimeToWait(getCompletionTimeout());
}
this.topicLock.lock();
String[] topics = getTopic();
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
try {
this.client.connect(connectionOptions);
this.client.setManualAcks(isManualAcks());
if (topics.length > 0) {
int[] requestedQos = getQos();
int[] grantedQos = Arrays.copyOf(requestedQos, requestedQos.length);
this.client.subscribe(topics, grantedQos);
warnInvalidQosForSubscription(topics, requestedQos, grantedQos);
}
}
catch (MqttException ex) {
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
}
logger.error(ex, () -> "Error connecting or subscribing to " + Arrays.toString(topics));
if (this.client != null) { // Could be reset during event handling before
this.client.disconnectForcibly(this.disconnectCompletionTimeout);
try {
this.client.setCallback(null);
this.client.close();
}
catch (MqttException e1) {
// NOSONAR
}
this.client = null;
}
throw ex;
}
finally {
this.topicLock.unlock();
}
if (this.client.isConnected()) {
this.connected = true;
String message = "Connected and subscribed to " + Arrays.toString(topics);
logger.debug(message);
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
}
}
}
outbound

核心是重写了 AbstractMessageHandler.handleMessageInternal 方法,通过内部定义的 publish 方法处理 mqtt 消息向中间的发送。
@Override
protected void publish(String topic, Object mqttMessage, Message<?> message) {
Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'");
try {
IMqttDeliveryToken token = checkConnection()
.publish(topic, (MqttMessage) mqttMessage);
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (!this.async) {
token.waitForCompletion(getCompletionTimeout()); // NOSONAR (sync)
}
else if (this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(),
getClientInstance()));
}
}
catch (MqttException e) {
throw new MessageHandlingException(message, "Failed to publish to MQTT in the [" + this + ']', e);
}
}
support

- MqttMessageConverter:将 Paho mqtt 消息转化为 spring-integaration 消息
- MqttHeaders 定义了 mqtt 消息头的 pojo,MqttHeaderAccessor 提供静态方法访问而不用指定字面量访问头信息
- MqttUtils 提供拷贝 ConnectionOptions 的方法
- MqttHeaderMapper 提供 Mqtt5 的头信息支持
其余
config.xml
负责从 xml 中解析 bean 定义,通过 .xml 可以定义两个 bean:
- MqttPahoMessageDrivenChannelAdapter
- MqttPahoMessageHandler
都由 config.xml 包中两个对应的 parser 完成:
- MqttMessageDrivenChannelAdapterParser
- MqttOutboundChannelAdapterParser
而 xml 命令空间由 MqttNamespaceHandler 处理
core

主要定义了 mqtt 的 Paho 工厂,为工厂设置连接的参数,以及创建和返回相应的 mqtt 客户端(Paho 实现,同步,异步两种类型),以及使用者停止后的动作的枚举值。
event

都是继承自 spring 中的 ApplicationEvent,持有时间的源头(事件的产生者,事件的信息,比如一个 mqtt 消息,以及事件的产生原因)。