【问题标题】:Camel "Composed Message Processor" how to detect "onComplete"Camel“组合消息处理器”如何检测“onComplete”
【发布时间】:2015-10-14 09:30:42
【问题描述】:

在我的骆驼路线中,我必须使用Composed Message Processor。这意味着我首先必须拆分交换(在来自 db 的大结果集上),然后我必须聚合它。

我使用解耦的 seda 队列看到的问题是如何检测工作是否完成(路由是手动触发的,并且会运行几个小时)。我必须回调基础设施,工作已完成。

使用“仅拆分器”变体不是一个好的选择,因为我会失去聚合器相关逻辑(相关交换必须有序)。但最大的问题是丢失了completionSize。我无法聚合到多个组中。

以下是该模式的示例:

from("direct:start")
    .split().body()
    .end()
    .to("seda:aggregate");

// collect and re-assemble the validated OrderItems into an order again
from("seda:aggregate")
    .process(setHeaders)
    .aggregate(new MyOrderAggregationStrategy()).header("orderId").completionSize(header("count"))
    .parallelProcessing()
    .process(doTheWork)
    .to("mock:result");

这里是一个仅用于拆分器的示例:

from("direct:start")
    .routePolicy( finishNotifier ) //implements onExchangeDone
    .split(body(), new MyOrderStrategy())
    .parallelProcessing()
    .process(doTheWork)
    .to("bean:MyOrderService?method=buildCombinedResponse")

附带说明:仅在禁用 parallelProcessing 时链接拆分和聚合才有效。启用并行处理“doTheWork”在finishNotifier.onExchangeDone 之后调用。由于我在工作完成时调用 context.stop(),因此在骆驼停止后调用了“进程”方法!

【问题讨论】:

  • 您能否发布一个简化版本的路线,以及您所说的“仅拆分器”变体是什么意思。
  • 添加了两种变体的示例

标签: apache-camel


【解决方案1】:

您可以实施自己的自定义聚合策略,该策略依赖于计算收到的交换数与您的流程拥有的记录总数。您还可以查看骆驼行动手册第 8-10 章,以获得许多基本实现示例的拆分器和聚合模式的非常好的参考示例。如果您使用骆驼,我强烈建议您至少选择一份 PDF 副本,因为它可以很容易地涵盖您的典型企业用例。

这篇博文也非常适合解决为您的用例正确设置聚合设置的一些问题:

http://tmielke.blogspot.com/2009/01/using-camel-aggregator-correctly.html

【讨论】:

  • 谢谢,但我知道这本书和博客。最后几天我读了很多关于我的问题。我还提交了一个错误:issues.apache.org/jira/browse/CAMEL-9222 在我的特殊情况下,我不知道要预先处理的所有条目的数量。
  • 那么我建议您在处理完所有文件后添加一条控制消息。您可以在拆分后添加一个方法,将最终计数计数作为标头发送到带有聚合器的路由,聚合器可以单独解释该消息并更新您的聚合实例。
【解决方案2】:

很抱歉回答我自己的问题,但我找到了一个非常简单的解决方案。我遇到这种模式的原因是骆驼用户组的建议。根本问题是聚合器模式中的broken(imho) parallelProcessing()。 我原来的路线只是链分离器和聚合器,但需要在聚合器的输出上使用多线程。

解决方案很简单:只需使用“threads()”而不是 parallelProcessing()。

【讨论】:

    猜你喜欢
    • 2014-10-17
    • 2019-05-16
    • 2016-01-16
    • 1970-01-01
    • 2019-01-13
    • 2017-04-04
    • 1970-01-01
    • 2014-09-03
    • 1970-01-01
    相关资源
    最近更新 更多