【发布时间】:2020-10-09 07:58:05
【问题描述】:
对于多播 + 聚合,我有以下奇怪的(或至少我不清楚)行为。考虑以下路线:
from("direct:multicaster")
.multicast()
.to("direct:A", "direct:B")
.aggregationStrategy(new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
List firstResult = newExchange.getIn().getBody(List.class);
newExchange.getIn().setBody(ImmutableList.copyOf(firstResult));
return newExchange;
} else {
List oldResults = oldExchange.getIn().getBody(List.class);
List newResults = newExchange.getIn().getBody(List.class);
ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults));
oldExchange.getIn().setBody(aggResult);
return oldExchange;
}
}
})
.end()
// .to("log:bla")
本质上,这条路由接受一个输入,将其发送到direct:A 和direct:B,期望来自这两个端点的列表并将它们连接起来(最后一行中的注释是有原因的,我稍后会解释)。
现在假设这两个端点分别“返回”列表 [A] 和 [B]。如果我将消息M 发送到direct:multicaster,则使用oldExchange = null 和newExchange.in.body=[A] 调用一次聚合器,然后使用oldExchange.in.body=[A] 和newExchange.out.body=[B](应该这样做)。
到目前为止一切顺利。但是再次使用oldExchange.in.body=[A,B] 和newExchange.in=M 调用聚合器(M 是初始消息)。这看起来类似于包含的扩充模式。
您可以通过删除最后一行中的注释来获得预期的行为,即只需添加一个虚拟to("log:bla")。有了这个,一切都按预期运行。
更新:尝试(参见克劳斯提供的提示)
.multicast()
.aggregationStrategy(aggStrategy)
.to("direct:A", "direct:B")
.end()
和
.multicast(aggStrategy)
.to("direct:A", "direct:B")
.end()
两者都导致相同的行为。
这里发生了什么 - 我做错了什么?
提前致谢 标记
【问题讨论】:
-
你应该有aggregationStrategy之前
-
嗨克劳斯,我添加了两个新版本,我尝试过你的提示,但仍然得到相同的行为 - 这里一定有我遗漏的东西
-
你用的是什么版本的骆驼?您确定粘贴到问题中的代码是正在运行的确切代码吗?我无法重现该行为。
-
我sondrewe,我使用的是2.11.1;我将尝试使用最新版本,如果仍然失败,我将提取完整(最小)示例
-
我创建了一个独立的示例来显示完整的示例(这是一个测试设置):gist.github.com/frickm/7114961。有两条重要的行需要查看:第 91 行带有注释掉的日志端点和第 45 行,我在其中编织了一个模拟端点以使测试成为可能。删除第 45 行中的 weaveAddLast() 似乎可以工作,但无法对其进行测试。
标签: java apache-camel