【问题标题】:Apache Camel - Aggregation split elements to a listApache Camel - 将元素聚合拆分为列表
【发布时间】:2018-09-29 02:02:23
【问题描述】:

问题:

我首先有一个笼子列表,我在其中单独检索并拆分这些笼子的位置。

后面的消费者已经一一拿到笼子,处理器提取位置。

from("direct:cages-to-positions")
                    .process(new CageToPositionProcessor())
                    .aggregate(body(), new PositionsAggregation())
                    .completionTimeout(1000)
                    .to("mock:test");

在那之后,我不想让它们回到列表中,所以我使用以下聚合。

public class PositionsAggregation implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            List<Map> positions = new ArrayList<Map>();
            positions.add((Map) newExchange.getIn().getBody());
            newExchange.getOut().setBody(positions);
            return newExchange;
        }

        Map newPosition = (Map) newExchange.getIn().getBody();
        List<Map> positions = (List<Map>) oldExchange.getIn().getBody();
        positions.add(newPosition);
        oldExchange.getIn().setBody(positions);
        return oldExchange;
    }
}

当我调试时,我可以看到我得到了正确的结尾 oldExchange,但无论如何模拟中的结果根本不是列表:

打印最后一个 oldExchange 正文:

[{latitude=49.305, longitude=1.2157357}, {latitude=49.305142, longitude=1.2154067}]

模拟结果:

11191 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.SendProcessor  - >>>> mock://test Exchange[ID-1524074230289-0-6]
11191 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.component.mock.MockEndpoint  - mock://test >>>> 0 : Exchange[ID-1524074230289-0-6] with body: {latitude=49.305, longitude=1.2157357} and headers:{breadcrumbId=ID-1524074230289-0-6}
11192 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap  - Completion timeout triggered for correlation key: {latitude=49.305142, longitude=1.2154067}
11192 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.aggregate.AggregateProcessor  - Aggregation complete for correlation key **{latitude=49.305142, longitude=1.2154067}** sending aggregated exchange: Exchange[ID-1524074230289-0-9]
11192 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.aggregate.AggregateProcessor  - Processing aggregated exchange: Exchange[ID-1524074230289-0-9]
11192 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.SendProcessor  - >>>> mock://test Exchange[ID-1524074230289-0-9]
11192 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.component.mock.MockEndpoint  - mock://test >>>> 1 : Exchange[ID-1524074230289-0-9] with body: {latitude=49.305142, longitude=1.2154067} and headers:{breadcrumbId=ID-1524074230289-0-9}

【问题讨论】:

    标签: java apache-camel


    【解决方案1】:

    您的示例适用于相同的传入主体。 您应该将相关表达式从 body() 替换为 constant(true),因为您没有在拆分器中使用内置聚合器。

    所以你

    1) 重构以使用拆分器的内置聚合器,例如

    .split(body(), new MyAggregationStrategy())
    

    在这种情况下不需要完成条件(.completionTimeout())。

    这将聚合传入列表中的所有拆分消息。这个answer中的一个例子。

    2) 如果您想在 1000 周期内收集所有传入交换,您必须进行以下更改

     .aggregate(constant(true), new PositionsAggregation())
    

    【讨论】:

    • 嗨,谢谢你的回答,我不确定第一个参数“correlationExpression”代表什么,特别是常量(真)值,你能帮我吗?另外,为什么我可以在调试中看到必须建立正确的列表?无论如何,您的解决方案奏效了!
    • 阅读骆驼文档,了解相关表达式 github.com/apache/camel/blob/master/camel-core/src/main/docs/… 。至于调试器,它可能只访问了第一个 if(oldExcange always null) 并且该策略创建了两个列表,每个列表都有一个元素。
    • 好的,我知道了,让我感到困扰的是调试器向我显示了一个包含两个位置的列表,但我想我挂断了,迷失了自己。
    猜你喜欢
    • 1970-01-01
    • 2023-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多