【问题标题】:message ordering and parallel processing apache camel消息排序和并行处理apache骆驼
【发布时间】:2018-05-05 21:35:53
【问题描述】:

我想处理一个自定义 java 对象列表,使用骆驼拆分器拆分这些对象,并希望在并行线程中处理它。但我面临的挑战是,自定义对象列表是根据 Id 排序的,它必须写在一个文件中。

一旦我使用并行处理,顺序就会被打乱。我浏览了几篇要求使用“重新排序器”或“单线程”的文章。 但是使用单线程,处理 5k 条记录需要大量时间。

任何线索都会非常有帮助。 谢谢 尼丁

【问题讨论】:

  • 我假设你的 java 对象代表一些数据?为什么不将 java 对象作为 json 数据或其他格式输出到消息队列中,让另一个路由消费和处理来做你想做的事。
  • Nitin,您可以在拆分标签本身中添加 parallelprocessing = true。所以所有的记录都会被同时处理,它不会等待一个完成。

标签: parallel-processing apache-camel


【解决方案1】:

我在基于标签“XXX”拆分 XML 请求时遇到了类似的问题。然后处理拆分的请求并聚合成响应。 聚合响应的顺序与请求的顺序不同。

FIX : 该问题已通过在拆分器 EIP 中使用聚合“strategyRef”解决。

示例代码:

    <route>
        <from id="_from1" uri="activemq:DocumentGenerationQueue" />
        <split  parallelProcessing="true" streaming="false"  strategyRef = "AggregateTask" >
        <tokenize token="XXX" xml="true" />
            <to id="_to71"  uri="bean:ProcessorBean" />
            <to id="_to72"  uri="activemq:SplittedResponseQueue" />
        </split>
        <to uri="activemq:AggregatedResponseQueue" />
    </route>

【讨论】:

    【解决方案2】:

    您可以创建一个AggregationStrategy 的实例来比较newExchangeoldExchange 的结果,并创建一个resultExchange,其中包含基于id 的自定义java 对象的排序列表。

    但是使用单线程,处理 5k 条记录需要大量时间。

    您必须小心,因为您可能不想启动 5k 并行线程,而是创建自己的线程池,并将其附加到拆分中,并使用 executorServiceRef。通过这种方式,您可以控制线程数并处理队列已满时的操作。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-01-24
      • 2011-08-27
      相关资源
      最近更新 更多