【问题标题】:Execute Spring Integration flows in parallel并行执行 Spring Integration 流
【发布时间】:2019-07-16 15:09:47
【问题描述】:

如果我有一个像这样的简单集成流:

@Bean
public IntegrationFlow downloadFlow() {
    return IntegrationFlows.from("rabbitQueue1")
        .handle(longRunningMessageHandler)
        .channel("rabbitQueue2")
        .get();
}

...如果rabbitQueue1 充满了消息,

我应该怎么做才能同时处理多条消息?那可能吗? 似乎默认情况下,处理程序一次执行一条消息。

【问题讨论】:

    标签: rabbitmq spring-integration amqp


    【解决方案1】:

    是的,这是真的,默认情况下端点与DirectChannel 连接。这就像一条一条地执行一条普通的 Java 指令。因此,要在 Java 中执行一些并行工作,您需要一个 Executor 来将调用转移到单独的线程。

    同样可以通过ExecutiorChannel 使用 Spring Integration。您可以将 rabbitQueue1 设为 ExecutorChannel bean 或使用它来代替普通名称:

    IntegrationFlows.from(MessageChannels.executor("rabbitQueue1", someExecturorBean)
    

    所有到达该通道的消息都将在执行程序提供的线程中并行处理。 longRunningMessageHandler 将并行处理您的消息。

    在参考手册中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/#channel-implementations

    【讨论】:

    • 如果我将 ThreadPoolTask​​Executor Bean 添加到 queuecapac=50、corepoolsize=5、maxpoolsize=10 的上下文中会发生什么?
    • ExecutorChannel 只不过是提供的Executor 的委托人。不确定您在上下文中的问题是什么......
    • 假设您使用的是消息驱动的通道适配器,除非您手动确认消息,否则不应使用ExecutorChannel。否则,您可能会在失败后丢失正在处理的消息。而是增加入站通道适配器中的concurrency 以同时处理多条消息。您可能还需要从默认值 (250) 中减少预取,以便消息分布在容器线程中。
    猜你喜欢
    • 2023-01-12
    • 1970-01-01
    • 2015-10-18
    • 1970-01-01
    • 1970-01-01
    • 2018-10-02
    • 2017-02-14
    • 2018-04-08
    • 1970-01-01
    相关资源
    最近更新 更多