【发布时间】:2018-09-29 02:02:23
【问题描述】:
问题:
我首先有一个笼子列表,我在其中单独检索并拆分这些笼子的位置。
后面的消费者已经一一拿到笼子,处理器提取位置。
from("direct:cages-to-positions")
.process(new CageToPositionProcessor())
.aggregate(body(), new PositionsAggregation())
.completionTimeout(1000)
.to("mock:test");
在那之后,我不想让它们回到列表中,所以我使用以下聚合。
public class PositionsAggregation implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
List<Map> positions = new ArrayList<Map>();
positions.add((Map) newExchange.getIn().getBody());
newExchange.getOut().setBody(positions);
return newExchange;
}
Map newPosition = (Map) newExchange.getIn().getBody();
List<Map> positions = (List<Map>) oldExchange.getIn().getBody();
positions.add(newPosition);
oldExchange.getIn().setBody(positions);
return oldExchange;
}
}
当我调试时,我可以看到我得到了正确的结尾 oldExchange,但无论如何模拟中的结果根本不是列表:
打印最后一个 oldExchange 正文:
[{latitude=49.305, longitude=1.2157357}, {latitude=49.305142, longitude=1.2154067}]
模拟结果:
11191 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.SendProcessor - >>>> mock://test Exchange[ID-1524074230289-0-6]
11191 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.component.mock.MockEndpoint - mock://test >>>> 0 : Exchange[ID-1524074230289-0-6] with body: {latitude=49.305, longitude=1.2157357} and headers:{breadcrumbId=ID-1524074230289-0-6}
11192 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap - Completion timeout triggered for correlation key: {latitude=49.305142, longitude=1.2154067}
11192 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.aggregate.AggregateProcessor - Aggregation complete for correlation key **{latitude=49.305142, longitude=1.2154067}** sending aggregated exchange: Exchange[ID-1524074230289-0-9]
11192 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated exchange: Exchange[ID-1524074230289-0-9]
11192 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.SendProcessor - >>>> mock://test Exchange[ID-1524074230289-0-9]
11192 [Camel (camel-1) thread #1 - AggregateTimeoutChecker] DEBUG org.apache.camel.component.mock.MockEndpoint - mock://test >>>> 1 : Exchange[ID-1524074230289-0-9] with body: {latitude=49.305142, longitude=1.2154067} and headers:{breadcrumbId=ID-1524074230289-0-9}
【问题讨论】:
标签: java apache-camel