【发布时间】:2017-04-02 04:32:42
【问题描述】:
您好,我的输入文件如下所示
AAA,1
AAA,2
AAA,3
BBB,1
CCC,1
CCC,2
DDD,1
DDD,5
DDD,4
EEE,1
我想将其拆分并聚合为多条消息,但将相关记录保存在一起,例如:
消息 1:
AAA,1
AAA,2
AAA,3
BBB,1
CCC,1
CCC,2
消息 2:
DDD,1
DDD,5
DDD,4
EEE,1
并防止这样的事情发生:
消息 1:
AAA,1
AAA,2
AAA,3
BBB,1
CCC,1
消息 2:
CCC,2
DDD,1
DDD,5
DDD,4
EEE,1
CCC,2 应写入消息 1 或 CCC,1 应写入消息 2。
completionSize 不是恒定的,但应该是一个阈值。关于上面的示例,“如果有更多以 CCC 开头的记录也将它们放入消息中,则在消息中聚合 5 条记录”。
这是我的路线:
.split().tokenize("\n").streaming()
.aggregate().constant(true)
.aggregationStrategy(new MyAggregationStrategy())
.completionSize(5)
.completionTimeout(5000)
达到completionSize 阈值5 后,MyAggregationStrategy 必须检查下一条消息(newExchange)并决定是否将其聚合到oldExchange,即使大小大于5。如果不聚合该消息对于 oldExchange,聚合完成,新的聚合开始。如何确保这条被上次聚合拒绝的消息将成为新聚合中的第一条消息?
由于输入文件可能非常大,我将使用流式传输而不是先读取整个文件,然后通过自定义 bean 将其剪切为单个消息。
【问题讨论】:
-
我想知道自定义拆分器是否会更好地工作:即不是按行拆分,而是读取文件并每隔 N 行拆分...但是扩展 N 以确保第一个具有相同值的记录字段保存在单个(拆分)消息中。
标签: java apache-camel