主要接口定义

public interface MessageChannel {

    boolean send(Message message);

    boolean send(Message message, long timeout);
}
public interface PollableChannel extends MessageChannel {

    Message<?> receive();

    Message<?> receive(long timeout);

}
public interface SubscribableChannel extends MessageChannel {

    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);

}

具体实现

PublishSubscribeChannel

发布订阅管道,同一个消息可以被多个订阅者接收,订阅者无法主导进行拉取(消息不缓存)。

QueueChannel

队列管道,同一个消息只能被一个订阅者接收,即使管道存在多个消息的消费者。消息会被缓存,那么就需要考虑消息容量的问题了,达到容量之后再发送消息发送端点将被阻塞(可以设置 timeout )。

PriorityChannel

和 QueueChannel 类似与 Queue 与 PriorityQueue,默认使用消息 Header 里的 priority 字段,同时支持 Comparator。

RendezvousChannel

在发出的消息被处理之前,发送端会被一直阻塞。

DirectChannel

不缓存消息,一个消息只会发送给一个消费者。类似于程序方法的调用,即发送和消息的接收和处理可以在一个线程中完成,那么它是支持事务的。类似于 rabbit mq 中的工作队列,二者都可以使用负载均衡,消息处理异常重发等。

ExecutorChannel

和 DirectChannel 大体相似,主要的区别在于消息的分发是由一个 TaskExecutor 实现,从而发送这个动作是异步的,所以不支持事务。

FluxMessageChannel

专门为响应式的编程设计,仅实现独立的一个接口 ReactiveStreamsSubscribableChannel,只有 org.reactivestreams.Subscriber 实例可以作为消费者。

Scoped Channel

定义一个 channel 的作用范围,作用范围之外的接收者无法接收。

管道拦截器

接口定义

public interface ChannelInterceptor {

    Message<?> preSend(Message<?> message, MessageChannel channel);

    void postSend(Message<?> message, MessageChannel channel, boolean sent);

    void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);

    boolean preReceive(MessageChannel channel);

    Message<?> postReceive(Message<?> message, MessageChannel channel);

    void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}

在实现之后向管道注册

channel.addInterceptor(someChannelInterceptor);

MessagingTemplate

类似于 JdbcTemplate, RedisTemplate 可以自由的发送和接收消息,一些常用的方法签名如下:

public boolean send(final MessageChannel channel, final Message<?> message) { ...
}

public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}

public Message<?> receive(final PollableChannel<?> channel) { ...
}