【发布时间】:2019-07-15 15:33:16
【问题描述】:
在具有 AMQP 与 RabbitMQ 集成的 Spring 集成 应用程序中,我们会遇到意外行为。
Spring Integration 应用程序(java 配置,dsl)由 3 个流和 2 个持久队列组成。
假设:flow1 -> queue1 -> flow2 -> queue2 -> flow3
flow1 以 Message 开头,最终拆分为 50 条消息 (.split())。第一个流写入 AMQP/Rabbit MQ 队列。
在 Rabbit UI 中,我们观察到从 0 条消息到 50 条消息的跳跃。到目前为止还好。
然后我认为“确认”随之而来,兔子中的 50 件商品可以说是对消费者可见。
然后 flow2 从此队列中读取并开始处理消息。每条消息的处理大约需要 5 秒。处理完消息后,将其写入下一个队列 (queue2)。
出乎意料的行为是 queue2 被填满,直到 queue1 中的所有 50 个都被处理(大约 250 秒后)。
我假设 queue1 和 queue2 之间的 flow2 在一个事务中处理所有传入请求。并且它只会在 queue1 上的所有项目都处理完后才确认 queue2 上的新消息。
我什至认为我遇到过这样一种情况,即在 queue1 尚未为空时将更多项目插入其中。然后,在 flow2 中处理了最初的 50 个元素后,它仍然没有在 queue2 中确认它们。它似乎只在 queue1 完全为空后才确认项目。
然后 flow3 以相同的方式开始处理:它只在 queue1 中的所有内容都被 处理后才看到 queue2 中的项目流2。
效果是50条消息是分批处理而不是逐条处理。只要有 1 条消息从 .split() 流出,我希望它单独流过所有流。那么,在 spring-integration、amqp 或 rabbit-mq 中是否有默认设置为在整个工作负载上创建事务?
我是否需要强制消费者只选择 1 条消息并围绕该消息创建交易?或者,我应该单独“确认”消息吗?还是应该在 java config 中以更一般的方式配置行为?
我最初的想法是 DSL .split() 逻辑是原因。它添加了相关 id 和序列信息等标题。 (我想这是为了允许聚合器计算是否所有内容都已处理)。为清楚起见:我的应用中没有定义(显式)聚合器。
我的第一种方法是在插入 queue1 之前清除拆分聚合标头。但这并没有奏效。
.split(s -> s.transactional(false)) 也没有规避这一点。
编辑:
忘记上面的流/队列命名。这是我的 Spring 集成代码。我想我在这里包含了最相关的 bean。
第一阶段从轮询器创建空消息。这些是触发对提要(json 中的 50 个项目)的请求的事件。 50 个项目(拆分)中的每一个都保存在第一个兔子队列中。
然后第二阶段开始(传入的 amqp 消息被丢弃在 myChannel2 中)。通过 myChannel3 和 myChannel4 最终被持久化在第二个兔子队列中。
这两个阶段是并行处理的。我看到 FIRST_RABBIT_QUEUE 每次都被 50 条新消息填满。
我还看到执行了第二阶段: SECOND_RABBIT_QUEUE 被填满(并且第一个队列的计数器减少了)。一切都好。
但是现在 SECOND_RABBIT_QUEUE 不断增长,并且从未由myFlow3 处理。
如果第一个队列的增长速度快于它被清空的速度,则两个队列(第一个、第二个)都会继续增长。然而,当它被清空时(计数器归零),第三阶段(myFlow3)开始工作!
我的配置:
@Bean
public MessageChannel myChannel1() {
return MessageChannels.direct().get();
}
@Bean
public MessageChannel myChannel2() {
return MessageChannels.direct().get();
}
@Bean
public MessageChannel myChannel3() {
return MessageChannels.direct().get();
}
@Bean
public MessageChannel myChannel4() {
return MessageChannels.direct().get();
}
@Bean
public MessageChannel myChannel5() {
return MessageChannels.direct().get();
}
@Bean
public MessageChannel myChannel6() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow outputAmqpFlow(final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(AMQP_OUTPUT)
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName(AmqpConfiguration.TOPIC_EXCHANGE)
.routingKeyExpression("headers['queueRoutingKey']"))
.get();
}
private HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter(AmqpHeaders.CONSUMER_QUEUE);
router.setChannelMapping(AmqpConfiguration.FIRST_RABBIT_QUEUE, "myChannel2");
router.setChannelMapping(AmqpConfiguration.SECOND_RABBIT_QUEUE, "myChannel5");
router.setResolutionRequired(false);
router.setDefaultOutputChannelName("errorChannel");
return router;
}
@Bean
public IntegrationFlow routeIncomingAmqpMessagesFlow(final SimpleMessageListenerContainer simpleMessageListenerContainer,
final Queue firstRabbitQueue,
final Queue secondRabbitQueue,
final Queue thirdRabbitQueue,
final Jackson2JsonMessageConverter jackson2MessageConverter) {
simpleMessageListenerContainer.setQueues(
firstRabbitQueue,
secondRabbitQueue,
thirdRabbitQueue
);
return IntegrationFlows.from(
Amqp.inboundAdapter(simpleMessageListenerContainer)
.messageConverter(jackson2MessageConverter))
.headerFilter("queueRoutingKey")
.route(router())
.get();
}
@Bean
public IntegrationFlow myFlow0() {
return IntegrationFlows.<MessageSource>from(
() -> new GenericMessage<>("trigger flow1"),
c -> c.poller(Pollers.fixedRate(getPeriod(), initialDelay)))
.channel(myChannel1())
.get();
}
@Bean
public IntegrationFlow myFlow1() {
return IntegrationFlows.from(myChannel1())
.handle(String.class, (p, h) -> {
try {
return getLast50MessagesFromWebsite();
} catch (RestClientException e) {
throw new AmqpRejectAndDontRequeueException(e);
}
})
.split()
.enrichHeaders(h -> h.header("queueRoutingKey", AmqpConfiguration.FIRST_RABBIT_QUEUE))
.channel(AMQP_OUTPUT) // persist in rabbit
.get();
}
@Bean
public IntegrationFlow myFlow2_1() {
return IntegrationFlows.from(myChannel2())
.handle(this::downloadAndSave)
.channel(myChannel3())
.get();
}
@Bean
public IntegrationFlow myFlow2_2() {
return IntegrationFlows.from(myChannel3())
.transform(myDomainObjectTransformer)
.handle(this::persistGebiedsinformatieLevering)
.channel(myChannel4())
.get();
}
@Bean
public IntegrationFlow myFlow2_3() {
return IntegrationFlows.from(myChannel4())
.handle(this::confirmMessage)
.enrichHeaders(h -> h.header("queueRoutingKey", AmqpConfiguration.SECOND_RABBIT_QUEUE))
.channel(AMQP_OUTPUT) //persist in rabbit
.get();
}
@Bean
public IntegrationFlow myFlow3() {
return IntegrationFlows.from(myChannel5())
.log(LoggingHandler.Level.INFO)
.get();
}
【问题讨论】:
标签: java spring rabbitmq spring-integration amqp