【问题标题】:Spring Cloud Stream @StreamListener and Spring Integration's Resequencer PatternSpring Cloud Stream @StreamListener 和 Spring Integration 的 Resequencer 模式
【发布时间】:2019-04-25 02:01:18
【问题描述】:
AFAIK Spring Cloud Stream 项目基于 Spring Integration。因此,我想知道在触发StreamListener 处理程序之前是否有一种很好的方法来重新排序入站消息的子集?还是我需要使用 Spring Integration 中的 XML 或 Java DSL 配置从头开始组装整个 IntegrationFlow?
我的用例如下。大多数时候,我会在 Kafka 主题上处理入站消息。但是,必须根据 CORRELATION_ID、SEQUENCE_NUMBER 和 SEQUENCE_SIZE 标头对一些事件重新排序。换句话说,我想尽可能多地使用 StreamListener,并简单地为某些事件插入重新排序策略。
【问题讨论】:
标签:
spring-integration
spring-cloud-stream
enterprise-integration
【解决方案1】:
是的,您需要为此使用 Spring Integration。实际上 Spring Cloud Stream 只是一个有效的绑定框架。它通过绑定器将消息处理程序绑定到消息代理。 消息处理程序本身由用户提供。
@StreamListener 注解几乎等同于 Spring Integration 的 @ServiceActivator,只有很少的额外功能(例如,条件路由),但除此之外它只是一个 消息处理程序。
现在,正如您所避免的,您知道您可以使用 Spring Integration (SI) 来实现 消息处理程序 或内部 SI 流,这是正常的,建议用于复杂情况。
也就是说,我们确实提供了实现某些 EIP 组件的开箱即用的应用程序,例如,我们确实有 aggregator 应用程序,您可以将其用作实现重测序器的起点。此外,鉴于我们有一个 aggregator 应用程序而不是 resequencer,如果您有兴趣,我们很乐意接受它的贡献。
我希望这能回答你的问题。