【问题标题】:Is Spring Integration Aggregator single threaded executionSpring Integration Aggregator 是单线程执行吗
【发布时间】:2018-10-02 20:21:18
【问题描述】:

我在我的应用程序中使用拆分器聚合器模式。 我有以下配置 -

<int:splitter inputChannel="CH1" outputChannel="CH2" ref="foo" method="bar"/>
<int:aggregator inputChannel="CH2" outputChannel="CH3" ref="oof" method="rab" ---other required configurations/>
<task:scheduler --with threadpool of size 30 />
<!--there are other channels as well. Some are queue and some are direct channels-->

我所有的频道(CH1、CH2、CH3)都是QueueChannel。 分配器输入通道 CH1 的源是一个文件。

在我的测试中,我观察到即使在 CH1 通道中添加两个文件,在给定时间也只会处理 1 个文件。所以我在我的 CH1 频道中添加了一个 Poller,现在 CH1 频道上的多个输入消息正在同时处理。

在聚合器方面,我观察到执行始终是单线程的,即直到第一个线程完成执行,第二个线程才开始执行。

到目前为止我为解决这个问题所做的尝试 -

  1. 将轮询器添加到 &lt;int:aggregate&gt; 配置
  2. 将轮询器添加到输入通道 CH2(输入到聚合器)
  3. 将调度程序添加到输入通道 CH2

谁能帮我让聚合器也处理多线程。

更新1 我正在尝试做的事情:

我收到包含股票数据的文件。我为每个文件创建消息并将其放入 CH1 频道。 CH1 通道有一个拆分器,将其拆分为单独的记录并将其发送到 CH2 通道。 CH2 频道有一个与之关联的聚合器,它将尝试根据文件名聚合记录,并且“oof.rab”将记录写入数据库。我的期望是看到多个批次的同一文件同时写入数据库。

更新 2: 此处进行聚合以批处理行,以便我们可以将其以批处理模式将其写入数据库,而不是单独写入行

【问题讨论】:

    标签: spring multithreading spring-integration


    【解决方案1】:

    聚合器是多线程的,但仅适用于不同的消息组(相关性)。每个组都必须是单线程的,因为对于每个消息到达,我们都必须咨询 ReleaseStrategy。

    【讨论】:

    • 假设关联逻辑是基于文件名的。据我了解,聚合器可能会同时处理两个不同文件的批量消息,但对于单个文件,它的批次将按顺序处理,即一批接一批
    • 如果这种理解是正确的,聚合器永远不会扩展
    • 不清楚你想做什么;聚合通常是流程的一小部分 - 通常在拆分器和聚合器之间进行更多工作。也许您可以解释您的完整用例,以便我们了解您正在尝试做的事情并提出其他建议。聚合器可以很好地扩展,但每个单独的聚合都必须是单线程的。
    • 完全不清楚为什么需要聚合器,为什么不直接使用服务激活器来写入记录,而不是先等待它们全部聚合到消息组中?
    • Splitter 组件拆分文件并返回行。我不是将单独的行一一写入数据库,而是使用聚合器对行进行批处理(以减少数据库往返次数,从而潜在地提高性能)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-04-04
    • 1970-01-01
    • 2018-09-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多