【问题标题】:Apache Camel: Discarding Split Processors When CompleteApache Camel:完成后丢弃拆分处理器
【发布时间】:2014-03-27 02:53:05
【问题描述】:

我正在使用具有并行处理功能的 Camel Split 组件来拆分整数的 ArrayList。该列表中有 700,000 多个 ID。这个想法是每个 ID 都需要传递给 XML 生成器,然后插入到数据库表中。但是,我注意到在消息正文中生成并传递回 Camel 的 XML 似乎被保留了。看起来它会保留消息正文,直到拆分中的所有元素都完成处理。这意味着我将在内存中拥有 700,000 多个 XML。有没有办法告诉骆驼不要这样做?要在 Split 工作人员完成工作后丢弃它?

我尝试过制作 ID 块并拆分每个块,以便完成拆分并释放内存。这里的问题是每个 ID 都会生成不同大小的 XML。因此,如果我创建了 10 个 ID 的块,其中 1 个与一个非常大的 XML 相关联,我的线程池必须等待最大的一个完成,我最终会得到 9 个空闲线程。

【问题讨论】:

    标签: java multithreading split apache-camel


    【解决方案1】:

    您查看Splitter EIP reference on Streambased splitting 了吗?它似乎与您的问题直接相关。使用 streaming() 调用,也许还有自定义聚合器,您可能会更幸运

    您可能会遇到拆分器聚合器的默认行为,它会从您将拆分消息传递到的任何处理器中获取所有“回复”,然后将它们重新组合在一起以作为单个“响应”从分离器处理器到您路线中的下一个处理器。因此内存中的 XML 负载很大。

    如果您编写自定义聚合器,则可以改为只记录已编组和持久化的消息计数,并将其作为“响应”从拆分器传递,这应该显式地从内存中释放 XML 有效负载。我会举一个更清楚的例子,但我的骆驼这些天已经生锈了。

    我希望我在这里留下了足够的面包屑来提供帮助。很大程度上,看看我在这个答案顶部链接到的文档,有许多很好的示例似乎直接或几乎适用于您的用例。

    【讨论】:

    • 感谢您的意见。实际上,我已经在使用流媒体选项了。我正在研究自定义聚合,只是希望可能有一些开箱即用的魔法,因为这似乎是 Camel 最擅长的。
    • 我已经有一段时间没有使用分离器了,但我记得有类似的痛苦。它与骆驼的基于处理器的模式有关,并且拆分器作为一个整体被认为是单个处理器,因此它期望构造单个输出。当您真的希望它发送离散消息而不是自动为您汇总结果时,这可能会引起相当大的恐慌。
    【解决方案2】:

    实际上,我不知道这是否是最佳实践,但如果不再需要正文内容,您可以定义一个 Processor 来清空正文,如下所示:

     public class EmptyProcessor implements Processor {
         @Override
         public void process(final Exchange exchange) throws Exception {
             exchange.getIn().setBody(null);
         }
     }
    

    此外,您可以并行处理单个拆分,这样持久的作业就不会阻塞处理。

    示例路线:

    final Random random = new Random(3);
    
    from("direct:start")
        .split().method(Splitter.class, "split")  // the splitter creates the Integer list
        .parallelProcessing()
        .executorService(Executors.newFixedThreadPool(2))
        .process(new Processor() {
            @Override
            public void process(final Exchange exchange) throws Exception {
                final long waitMs = (long) (random.nextFloat() * 1000);
                System.out.println("Doing a long lasting job for " + waitMs + " ms");
                Thread.sleep(wait);
            }
        })
        .process(new EmptyProcessor())
        .log("Body (should be emtpy): ${body}");
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-01-06
      • 2023-03-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-02-26
      相关资源
      最近更新 更多