【问题标题】:Apache Camel: multicast with aggregation - AggregationStrategy called too oftenApache Camel:具有聚合的多播 - 过于频繁地调用 AggregationStrategy
【发布时间】:2020-10-09 07:58:05
【问题描述】:

对于多播 + 聚合,我有以下奇怪的(或至少我不清楚)行为。考虑以下路线:

    from("direct:multicaster")
                .multicast()
                .to("direct:A", "direct:B")
                .aggregationStrategy(new AggregationStrategy() {
                    @Override
                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        if (oldExchange == null) {
                            List firstResult = newExchange.getIn().getBody(List.class);
                            newExchange.getIn().setBody(ImmutableList.copyOf(firstResult));
                            return newExchange;
                        } else {
                            List oldResults = oldExchange.getIn().getBody(List.class);
                            List newResults = newExchange.getIn().getBody(List.class);
                            ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults));
                            oldExchange.getIn().setBody(aggResult);
                            return oldExchange;
                        }
                    }
                })
                .end()
//                .to("log:bla")

本质上,这条路由接受一个输入,将其发送到direct:Adirect:B,期望来自这两个端点的列表并将它们连接起来(最后一行中的注释是有原因的,我稍后会解释)。

现在假设这两个端点分别“返回”列表 [A] 和 [B]。如果我将消息M 发送到direct:multicaster,则使用oldExchange = nullnewExchange.in.body=[A] 调用一次聚合器,然后使用oldExchange.in.body=[A]newExchange.out.body=[B](应该这样做)。

到目前为止一切顺利。但是再次使用oldExchange.in.body=[A,B]newExchange.in=M 调用聚合器(M 是初始消息)。这看起来类似于包含的扩充模式。

您可以通过删除最后一行中的注释来获得预期的行为,即只需添加一个虚拟to("log:bla")。有了这个,一切都按预期运行。

更新:尝试(参见克劳斯提供的提示)

            .multicast()
            .aggregationStrategy(aggStrategy)
            .to("direct:A", "direct:B")
            .end()

            .multicast(aggStrategy)
            .to("direct:A", "direct:B")
            .end()

两者都导致相同的行为。

这里发生了什么 - 我做错了什么?

提前致谢 标记

【问题讨论】:

  • 你应该有aggregationStrategy之前
  • 嗨克劳斯,我添加了两个新版本,我尝试过你的提示,但仍然得到相同的行为 - 这里一定有我遗漏的东西
  • 你用的是什么版本的骆驼?您确定粘贴到问题中的代码是正在运行的确切代码吗?我无法重现该行为。
  • 我sondrewe,我使用的是2.11.1;我将尝试使用最新版本,如果仍然失败,我将提取完整(最小)示例
  • 我创建了一个独立的示例来显示完整的示例(这是一个测试设置):gist.github.com/frickm/7114961。有两条重要的行需要查看:第 91 行带有注释掉的日志端点和第 45 行,我在其中编织了一个模拟端点以使测试成为可能。删除第 45 行中的 weaveAddLast() 似乎可以工作,但无法对其进行测试。

标签: java apache-camel


【解决方案1】:

我试图重现该问题,但没有成功。这就是我所做的:

路线:

public class MulticastRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        AggregationStrategy myAggregationStrategy = new MyAggregationStrategy();
        List<String> listA = Lists.newArrayList("A");
        List<String> listB = Lists.newArrayList("B");
        from("direct:multicast").routeId("multicastRoute").multicast(myAggregationStrategy).to("direct:A", "direct:B").end();

        from("direct:A").setBody(constant(listA));
        from("direct:B").setBody(constant(listB));
    }

    class MyAggregationStrategy implements AggregationStrategy {
        @Override
        public org.apache.camel.Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            System.out.println("Aggregate called with oldExchange = " + (oldExchange == null ? "null" :
                    oldExchange.getIn().getBody().toString()) + ", newExchange = " +
                    newExchange.getIn().getBody().toString());
            return newExchange;
        }
    }
}

创建了一个简单的测试来运行路由。

测试:

public class MulticastRouteTest extends CamelTestSupport {
  @Test
    public void testMulticastRoute() throws Exception {
        context.addRoutes(new MulticastRoute());
        template.sendBody("direct:multicast", null);
    }
}

打印出来:

Aggregate called with oldExchange = null, newExchange = [A]
Aggregate called with oldExchange = [A], newExchange = [B]

这是我们所期望的。希望这会帮助你。我看不出我做事的方式有什么不同,但希望你能发现。

【讨论】:

    【解决方案2】:

    我遇到了同样的问题。有两件事似乎很关键

    • 将 AggregationStrategy 设置为之前(我会直接将其设置为多播的参数)
    • 用“end()”结束多播

    如果你看一下每个节点的返回类型,我想你可以看出区别

    【讨论】:

      【解决方案3】:

      原因是因为聚合策略是跨不同对象实例的共享代码块。因此,无论何时创建实例,差异都会出现。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2023-03-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-02-17
        • 2016-04-29
        相关资源
        最近更新 更多