【发布时间】:2015-10-14 09:30:42
【问题描述】:
在我的骆驼路线中,我必须使用Composed Message Processor。这意味着我首先必须拆分交换(在来自 db 的大结果集上),然后我必须聚合它。
我使用解耦的 seda 队列看到的问题是如何检测工作是否完成(路由是手动触发的,并且会运行几个小时)。我必须回调基础设施,工作已完成。
使用“仅拆分器”变体不是一个好的选择,因为我会失去聚合器相关逻辑(相关交换必须有序)。但最大的问题是丢失了completionSize。我无法聚合到多个组中。
以下是该模式的示例:
from("direct:start")
.split().body()
.end()
.to("seda:aggregate");
// collect and re-assemble the validated OrderItems into an order again
from("seda:aggregate")
.process(setHeaders)
.aggregate(new MyOrderAggregationStrategy()).header("orderId").completionSize(header("count"))
.parallelProcessing()
.process(doTheWork)
.to("mock:result");
这里是一个仅用于拆分器的示例:
from("direct:start")
.routePolicy( finishNotifier ) //implements onExchangeDone
.split(body(), new MyOrderStrategy())
.parallelProcessing()
.process(doTheWork)
.to("bean:MyOrderService?method=buildCombinedResponse")
附带说明:仅在禁用 parallelProcessing 时链接拆分和聚合才有效。启用并行处理“doTheWork”在finishNotifier.onExchangeDone 之后调用。由于我在工作完成时调用 context.stop(),因此在骆驼停止后调用了“进程”方法!
【问题讨论】:
-
您能否发布一个简化版本的路线,以及您所说的“仅拆分器”变体是什么意思。
-
添加了两种变体的示例
标签: apache-camel