spring-integration-mqtt 在 Paho 之上,利用 spring-integration 原本的消息机制实现 mqtt 消息的发送和接收。

包整体结构比较简单

重要的是后 3 个

inbound

重点是通过继承 AbstractEndpoint 实现了 doStart() 方法,在启动时调用 connectAndSubscribe()。

@Override
protected void doStart() {
	Assert.state(getTaskScheduler() != null, "A 'taskScheduler' is required");
	try {
		connectAndSubscribe();
	}
	catch (Exception ex) {
		logger.error(ex, "Exception while connecting and subscribing, retrying");
		scheduleReconnect();
	}
}

connectAndSubscribe() 如下,主要完成的工作:

  • 从工厂获取 ConnecitonOptions
  • 从工厂获取 MqttClient
  • client 设置 ConnectOptions 创建于中间件的连接并设置订阅
  • 将当前类作为 client 中的回调处理器(实现了 MqttCallback 接口)
  • 如果启动失败,通过 ApplicationEventPublisher 发布这个失败事件
private synchronized void connectAndSubscribe() throws MqttException { // NOSONAR
	MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
	this.cleanSession = connectionOptions.isCleanSession();
	this.consumerStopAction = this.clientFactory.getConsumerStopAction();
	if (this.consumerStopAction == null) {
		this.consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
	}
	Assert.state(getUrl() != null || connectionOptions.getServerURIs() != null,
			"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
	this.client = this.clientFactory.getClientInstance(getUrl(), getClientId());
	this.client.setCallback(this);
	if (this.client instanceof MqttClient) {
		((MqttClient) this.client).setTimeToWait(getCompletionTimeout());
	}
	this.topicLock.lock();
	String[] topics = getTopic();
	ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
	try {
		this.client.connect(connectionOptions);
		this.client.setManualAcks(isManualAcks());
		if (topics.length > 0) {
			int[] requestedQos = getQos();
			int[] grantedQos = Arrays.copyOf(requestedQos, requestedQos.length);
			this.client.subscribe(topics, grantedQos);
			warnInvalidQosForSubscription(topics, requestedQos, grantedQos);
		}
	}
	catch (MqttException ex) {
		if (applicationEventPublisher != null) {
			applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
		}
		logger.error(ex, () -> "Error connecting or subscribing to " + Arrays.toString(topics));
		if (this.client != null) { // Could be reset during event handling before
			this.client.disconnectForcibly(this.disconnectCompletionTimeout);
			try {
				this.client.setCallback(null);
				this.client.close();
			}
			catch (MqttException e1) {
				// NOSONAR
			}
			this.client = null;
		}
		throw ex;
	}
	finally {
		this.topicLock.unlock();
	}
	if (this.client.isConnected()) {
		this.connected = true;
		String message = "Connected and subscribed to " + Arrays.toString(topics);
		logger.debug(message);
		if (applicationEventPublisher != null) {
			applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
		}
	}
}

outbound

核心是重写了 AbstractMessageHandler.handleMessageInternal 方法,通过内部定义的 publish 方法处理 mqtt 消息向中间的发送。

@Override
protected void publish(String topic, Object mqttMessage, Message<?> message) {
	Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'");
	try {
		IMqttDeliveryToken token = checkConnection()
				.publish(topic, (MqttMessage) mqttMessage);
		ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
		if (!this.async) {
			token.waitForCompletion(getCompletionTimeout()); // NOSONAR (sync)
		}
		else if (this.asyncEvents && applicationEventPublisher != null) {
			applicationEventPublisher.publishEvent(
					new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(),
							getClientInstance()));
		}
	}
	catch (MqttException e) {
		throw new MessageHandlingException(message, "Failed to publish to MQTT in the [" + this + ']', e);
	}
}

support

  • MqttMessageConverter:将 Paho mqtt 消息转化为 spring-integaration 消息
  • MqttHeaders 定义了 mqtt 消息头的 pojo,MqttHeaderAccessor 提供静态方法访问而不用指定字面量访问头信息
  • MqttUtils 提供拷贝 ConnectionOptions 的方法
  • MqttHeaderMapper 提供 Mqtt5 的头信息支持

其余

config.xml

负责从 xml 中解析 bean 定义,通过 .xml 可以定义两个 bean:

  • MqttPahoMessageDrivenChannelAdapter
  • MqttPahoMessageHandler

都由 config.xml 包中两个对应的 parser 完成:

  • MqttMessageDrivenChannelAdapterParser
  • MqttOutboundChannelAdapterParser

而 xml 命令空间由 MqttNamespaceHandler 处理

core

主要定义了 mqtt 的 Paho 工厂,为工厂设置连接的参数,以及创建和返回相应的 mqtt 客户端(Paho 实现,同步,异步两种类型),以及使用者停止后的动作的枚举值。

event

都是继承自 spring 中的 ApplicationEvent,持有时间的源头(事件的产生者,事件的信息,比如一个 mqtt 消息,以及事件的产生原因)。