spring integration 的消息框架存在两种方式:发布-订阅的及时的消息村里,和可拉取的可缓存消息处理,messaging endpoints 相应的也提供两类实现,所以当开发者实现消息处理的逻辑时不需要考虑“推”和“拉”的问题,由 messaging endpoint 来兼容两种不同的消息处理方式。

Messaging Endpoints

MessageHandler

MessageHandler 作为其他组件的父接口,定义如下:

public interface MessageHandler {

    void handleMessage(Message<?> message);

}

EventDrivenConsumer 作为最简单 MessageHandler 只需要处理回调以及对应的 channel 即可。

而 PollableConsumer 还需要设置一个 Trigger 或者 TaskExecutor 作为消息拉取的触发器。TaskExecutor 的方式类似线程池,可以并发的处理消息,需要注意的是以这种方式处理消息是一个异步的过程,并不会阻塞消息的拉取,可能造成因消费过慢,消息堆积而形成内存的泄露。而 Trigger 的处理是在一个线程中进行的,MessageHandler 的调用在 Trigger 的线程中执行。值得注意的是消息的拉取速率是动态的,运行时可更改。

xml 名称空间支持

<int:poller>

<int:poller> 支持 <int:transactional> 名称空间,获取事务支持(int 是 integration 的缩写)。由于 spring 中的事务支持是由代理机制具体是一个 TransactionInterceptor 作为一个 AOP 的切面完成的。那么 <int:ooller> 也支持 <int:advice-chain> 支持 MethodInterceptor 接口的 AOP。TaskExecutro 的支持可以使用 <task:executor> 完成。

消息负载的类型转换

配置示例如下:

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>
<int:converter>
    <bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

	public Number convert(Boolean source) {
		return source ? 1 : 0;
	}

}
@Configuration
@EnableIntegration
public class ContextConfiguration {

	@Bean
	@IntegrationConverter
	public SerializingConverter serializingConverter() {
		return new SerializingConverter();
	}

}
需要注意的是 Spring 存在两种负责类型转换的 bean, ConversionService 和 IntegrationConversionService,前者负载 bean 装配中的类型转换,后者负责运行时消息中的类型转换。二者职责相似,可以但是不建议混合使用。

内容类型转换

MessagingHandler 的调用基于 org.springframework.messaging.handler.invocation.InvocableHandlerMethod 框架,其中的 HandlerMethodArgumentResolver (例如 PayloadArgumentResolverMessageMethodArgumentResolver)实现会使用 MessageConverter 实现消息的 payload 的类型的转换,这是基于消息头中的 contentType 完成的。spring 提供了一个 MessageConverter 的实现 ConfigurableCompositeMessageConverter 将类型的转换委托给一系列的转换器( MappingJackson2MessageConverterByteArrayMessageConverterObjectStringMessageConverterGenericMessageConverter),还可以添加自定义的转换器,其中一个完成转换则成功。

Messaging Endpoint 内部的 bean

Messaging Endpoint 的内部会由多个 bean 构成,包括消息的实际消费者和负责拉取的管道适配器,以及适配器委派的 MessageSource 等。其中 MessageHandler 实例以 consumer_name.handler 作为 bean 名称。

Endpoint 角色

可以为多个 Messaging Endpoint 分配角色,以控制多个 Endpoint 的启用和停止。

这是由 IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER 实现的,这个类接收 Endpoint 的注册(在注册的同时提供这个 Endpoint 的角色),提供以下控制方法:

public Collection<String> getRoles() 

public boolean allEndpointsRunning(String role) 

public boolean noEndpointsRunning(String role) 

public Map<String, Boolean> getEndpointsRunningStatus(String role) 

主节点事件

Endpoint 的角色属性为其增加了对集群角色(主,从,候选)的支持,spring 中使用 leader initiator 处理角色变更的事件。提供一个抽象实现

@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry locks) {
    return new LockRegistryLeaderInitiator(locks);
}

Messaging Gateway

内容在单独的博客中:spring MessagingGateway 简介 (kicey.site)

Service Activator

这是一个 Endpoint 负责消息的实际处理,可以指定输入和输出管道,在没有指定输出管道时,将会使用消息中的响应管道(replyChannel),如果不需要产生新的消息,指定输出管道为 NullChannel。如果需要响应的话,处理消息的方法返回值不能为 void,具体使用哪一个处理方法由消息头信息和消息负载类型决定。