【问题标题】:How to make Spring Integration AMQP queue transactions decoupled?如何让 Spring Integration AMQP 队列事务解耦?
【发布时间】:2019-07-15 15:33:16
【问题描述】:

在具有 AMQPRabbitMQ 集成的 Spring 集成 应用程序中,我们会遇到意外行为。

Spring Integration 应用程序(java 配置,dsl)由 3 个流和 2 个持久队列组成。

假设:flow1 -> queue1 -> flow2 -> queue2 -> flow3

flow1 以 Message 开头,最终拆分为 50 条消息 (.split())。第一个流写入 AMQP/Rabbit MQ 队列。 在 Rabbit UI 中,我们观察到从 0 条消息到 50 条消息的跳跃。到目前为止还好。 然后我认为“确认”随之而来,兔子中的 50 件商品可以说是对消费者可见。

然后 flow2 从此队列中读取并开始处理消息。每条消息的处理大约需要 5 秒。处理完消息后,将其写入下一个队列 (queue2)。

出乎意料的行为是 queue2 被填满,直到 queue1 中的所有 50 个都被处理(大约 250 秒后)。

我假设 queue1queue2 之间的 flow2 在一个事务中处理所有传入请求。并且它只会在 queue1 上的所有项目都处理完后才确认 queue2 上的新消息。

我什至认为我遇到过这样一种情况,即在 queue1 尚未为空时将更多项目插入其中。然后,在 flow2 中处理了最初的 50 个元素后,它仍然没有在 queue2 中确认它们。它似乎只在 queue1 完全为空后才确认项目。

然后 flow3 以相同的方式开始处理:它只在 queue1 中的所有内容都被 处理后才看到 queue2 中的项目流2

效果是50条消息是分批处理而不是逐条处理。只要有 1 条消息从 .split() 流出,我希望它单独流过所有流。那么,在 spring-integration、amqp 或 rabbit-mq 中是否有默认设置为在整个工作负载上创建事务?

我是否需要强制消费者只选择 1 条消息并围绕该消息创建交易?或者,我应该单独“确认”消息吗?还是应该在 java config 中以更一般的方式配置行为?

我最初的想法是 DSL .split() 逻辑是原因。它添加了相关 id 和序列信息等标题。 (我想这是为了允许聚合器计算是否所有内容都已处理)。为清楚起见:我的应用中没有定义(显式)聚合器。

我的第一种方法是在插入 queue1 之前清除拆分聚合标头。但这并没有奏效。

.split(s -> s.transactional(false)) 也没有规避这一点。

编辑

忘记上面的流/队列命名。这是我的 Spring 集成代码。我想我在这里包含了最相关的 bean。

  1. 第一阶段从轮询器创建空消息。这些是触发对提要(json 中的 50 个项目)的请求的事件。 50 个项目(拆分)中的每一个都保存在第一个兔子队列中。

  2. 然后第二阶段开始(传入的 amqp 消息被丢弃在 myChannel2 中)。通过 myChannel3 和 myChannel4 最终被持久化在第二个兔子队列中。

这两个阶段是并行处理的。我看到 FIRST_RABBIT_QUEUE 每次都被 50 条新消息填满。 我还看到执行了第二阶段: SECOND_RABBIT_QUEUE 被填满(并且第一个队列的计数器减少了)。一切都好。 但是现在 SECOND_RABBIT_QUEUE 不断增长,并且从未由myFlow3 处理。

如果第一个队列的增长速度快于它被清空的速度,则两个队列(第一个、第二个)都会继续增长。然而,当它被清空时(计数器归零),第三阶段(myFlow3)开始工作!

我的配置:

@Bean
public MessageChannel myChannel1() {
    return MessageChannels.direct().get();
}

@Bean
public MessageChannel myChannel2() {
    return MessageChannels.direct().get();
}

@Bean
public MessageChannel myChannel3() {
    return MessageChannels.direct().get();
}

@Bean
public MessageChannel myChannel4() {
    return MessageChannels.direct().get();
}

@Bean
public MessageChannel myChannel5() {
    return MessageChannels.direct().get();
}

@Bean
public MessageChannel myChannel6() {
    return MessageChannels.direct().get();
}

@Bean
public IntegrationFlow outputAmqpFlow(final AmqpTemplate amqpTemplate) {
    return IntegrationFlows.from(AMQP_OUTPUT)
            .handle(Amqp.outboundAdapter(amqpTemplate)
                    .exchangeName(AmqpConfiguration.TOPIC_EXCHANGE)
                    .routingKeyExpression("headers['queueRoutingKey']"))
            .get();
}


private HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter(AmqpHeaders.CONSUMER_QUEUE);
    router.setChannelMapping(AmqpConfiguration.FIRST_RABBIT_QUEUE, "myChannel2");
    router.setChannelMapping(AmqpConfiguration.SECOND_RABBIT_QUEUE, "myChannel5");
    router.setResolutionRequired(false);
    router.setDefaultOutputChannelName("errorChannel");
    return router;
}

@Bean
public IntegrationFlow routeIncomingAmqpMessagesFlow(final SimpleMessageListenerContainer simpleMessageListenerContainer,
                                                     final Queue firstRabbitQueue,
                                                     final Queue secondRabbitQueue,
                                                     final Queue thirdRabbitQueue,
                                                     final Jackson2JsonMessageConverter jackson2MessageConverter) {
    simpleMessageListenerContainer.setQueues(
            firstRabbitQueue,
            secondRabbitQueue,
            thirdRabbitQueue
    );
    return IntegrationFlows.from(
            Amqp.inboundAdapter(simpleMessageListenerContainer)
                    .messageConverter(jackson2MessageConverter))
            .headerFilter("queueRoutingKey")
            .route(router())
            .get();
}

@Bean
public IntegrationFlow myFlow0() {
    return IntegrationFlows.<MessageSource>from(
            () -> new GenericMessage<>("trigger flow1"),
            c -> c.poller(Pollers.fixedRate(getPeriod(), initialDelay)))
            .channel(myChannel1())
            .get();
}

@Bean
public IntegrationFlow myFlow1() {
    return IntegrationFlows.from(myChannel1())
            .handle(String.class, (p, h) -> {
                try {
                    return getLast50MessagesFromWebsite();
                } catch (RestClientException e) {
                    throw new AmqpRejectAndDontRequeueException(e);
                }
            })
            .split()
            .enrichHeaders(h -> h.header("queueRoutingKey", AmqpConfiguration.FIRST_RABBIT_QUEUE))
            .channel(AMQP_OUTPUT) // persist in rabbit
            .get();
}

@Bean
public IntegrationFlow myFlow2_1() {
    return IntegrationFlows.from(myChannel2())
            .handle(this::downloadAndSave)
            .channel(myChannel3())
            .get();
}

@Bean
public IntegrationFlow myFlow2_2() {
    return IntegrationFlows.from(myChannel3())
            .transform(myDomainObjectTransformer)
            .handle(this::persistGebiedsinformatieLevering)
            .channel(myChannel4())
            .get();
}

@Bean
public IntegrationFlow myFlow2_3() {
    return IntegrationFlows.from(myChannel4())
            .handle(this::confirmMessage)
            .enrichHeaders(h -> h.header("queueRoutingKey", AmqpConfiguration.SECOND_RABBIT_QUEUE))
            .channel(AMQP_OUTPUT) //persist in rabbit
            .get();
}

@Bean
public IntegrationFlow myFlow3() {
    return IntegrationFlows.from(myChannel5())
            .log(LoggingHandler.Level.INFO)
            .get();
}

【问题讨论】:

    标签: java spring rabbitmq spring-integration amqp


    【解决方案1】:

    默认情况下从不启用事务,所以我认为这不是问题(除非您已明确启用它们)。

    你所描述的很奇怪。请记住,UI 不是实时的,它只是每隔几秒更新一次,所以你看到它从 0 到 50 “跳跃”也就不足为奇了。

    它似乎只在 queue1 完全为空后才确认项目。

    消费者对队列或其内容一无所知。

    .split(s -> s.transactional(false)
    

    您根本不应该这样做;那是启用事务(我很惊讶它完全可以工作,因为它应该需要事务管理器)但只要出站适配器没有事务性RabbitTemplate 就无关紧要了。

    您需要显示您的流程定义和任何配置属性以供任何人进一步帮助。

    【讨论】:

    • 好吧,我不应该称之为交易。我熟悉以提交结尾的 RDMS 事务。我假设“确认”与提交相当。因此,我将其称为事务...我知道 UI 不是实时的。您是说 50 条消息是逐条添加的,并且也是逐条“确认”的吗?关于消费者知道队列:这是我的说法:在源队列确认消息之前,消费者不会采取行动。
    • 发布者端没有“ack”(除非您使用事务)。已发布的消息可立即用于消费(只要不使用事务)。
    • 好的,很高兴知道。确认模式 AUTO 并且没有交易是我拥有(并且拥有)的。在阅读了 TRACE 日志后,我了解到 flow2 实际上是在 queue1 被填满时开始的。这样 确实 并行工作。然而,尽管 queue2 已满,但 flow3 并未启动。你知道为什么会发生这种情况吗?是什么让 Spring 集成“同时”观察多个队列?多线程?还是工人?例如,如何确保 flow2 并行运行多个实例?以及如何并行运行flow2和flow3?
    • 我需要查看您的配置,但应该没有什么可以阻止 flow3 在 flow2 生成消息时消费消息。我唯一能想到的是,如果您将 DirectMessageListenerContainers 与线程不足的共享任务执行器一起使用;作为discussed here
    • 很高兴 Spring Integration 提供了简化 EIP 对象的可能性。但我经常对像现在这样的幕后复杂性感到困惑。我会尝试在上面的问题中添加更多的配置细节。
    猜你喜欢
    • 2014-07-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多