【问题标题】:Camel AggregationStrategy keep excluded message for next aggregation iterationCamel AggregationStrategy 为下一次聚合迭代保留排除消息
【发布时间】:2017-04-02 04:32:42
【问题描述】:

您好,我的输入文件如下所示

AAA,1
AAA,2
AAA,3
BBB,1
CCC,1
CCC,2
DDD,1
DDD,5
DDD,4
EEE,1

我想将其拆分并聚合为多条消息,但将相关记录保存在一起,例如:

消息 1:

AAA,1
AAA,2
AAA,3
BBB,1
CCC,1
CCC,2

消息 2:

DDD,1
DDD,5
DDD,4
EEE,1

并防止这样的事情发生:

消息 1:

AAA,1
AAA,2
AAA,3
BBB,1
CCC,1

消息 2:

CCC,2
DDD,1
DDD,5
DDD,4
EEE,1

CCC,2 应写入消息 1 或 CCC,1 应写入消息 2。

completionSize 不是恒定的,但应该是一个阈值。关于上面的示例,“如果有更多以 CCC 开头的记录也将它们放入消息中,则在消息中聚合 5 条记录”。

这是我的路线:

.split().tokenize("\n").streaming()
.aggregate().constant(true)
.aggregationStrategy(new MyAggregationStrategy())
.completionSize(5)
.completionTimeout(5000)

达到completionSize 阈值5 后,MyAggregationStrategy 必须检查下一条消息(newExchange)并决定是否将其聚合到oldExchange,即使大小大于5。如果不聚合该消息对于 oldExchange,聚合完成,新的聚合开始。如何确保这条被上次聚合拒绝的消息将成为新聚合中的第一条消息?

由于输入文件可能非常大,我将使用流式传输而不是先读取整个文件,然后通过自定义 bean 将其剪切为单个消息。

【问题讨论】:

  • 我想知道自定义拆分器是否会更好地工作:即不是按行拆分,而是读取文件并每隔 N 行拆分...但是扩展 N 以确保第一个具有相同值的记录字段保存在单个(拆分)消息中。

标签: java apache-camel


【解决方案1】:

为什么不只将相关元素组合在一起呢?您当前正在使用 .constant(true) 进行聚合,这意味着只有一个关联组要聚合。相反,您可以执行以下操作:

.split().tokenize("\n").streaming()
.process(e -> ...)  //extract the type (AAA, BBB, etc.) into a header called type
.aggregate(header("type"), new MyAggregationStrtegy())
.completionTimeout(5000)

这样,只有相关的消息才会成为聚合批次的一部分。

【讨论】:

  • 谢谢,该文件包含大约 50.000 行。我为stackoverflow简化了属性AAA、BBB。在现实生活中,它们是长度为 6 的字母数字字符串。同一个字符串可能多次出现,也可能不出现。字符串是逻辑 id 的一部分。在该文件中可能有大约 10.000 种不同类型的此字符串。您建议的算法是否创建 10.000 个类型标题?然后为每种类型创建一条消息,然后将具有相同 6 字符 Sting 的所有传入记录聚合到其中?
  • 它为每条消息创建一个带有该字符串的标头,然后将具有相同 6 个字符字符串的消息聚合到同一个聚合组中。因此,它创建了 50k 类型的标头,这将导致至少 10k 个聚合组。我说至少是因为如果文件中的记录没有排序,由于超时,您可能不会在单个组中拥有具有相同 ID 的所有记录。
  • 好的,明白了。但这一切都发生在记忆中,不是吗?我们正在迁移旧的遗留系统。对于每一行,都会创建一个具有大约 50 个属性的对象。在内存中保存 50k 的这些对象可能会导致内存问题。
  • 当然,您不会将所有这些都保存在内存中 - 在当前设置下,您最多可以将每个组保存 5 秒,但如果您只是阅读,则 5 秒可能太多了从文件中提取,而不是对每条消息进行任何处理。在这种情况下,1 秒甚至 500 毫秒就足够了。
  • 嗨,Miloš,它几乎可以工作了,但现在我收到了 java.util.concurrent.RejectedExecutionException: null,但我不使用 parallelProcessing?
【解决方案2】:

您可以打开eagerCheckCompletiontrue,这将在聚合之前检查是否完成。在文档中查看更多详细信息:http://camel.apache.org/aggregator2

【讨论】:

  • 谢谢,当我使用自我实现的完成谓词并将 eagerCheckCompletion 设置为 true 时,我在 Predicate#matches 中处理当前交换。我有机会在Predicate#matches查看之前的交流,看看当前交流的内容是否和之前交流的内容吻合吗?
  • 不,目前不可能。这将需要增强聚合器 EIP 以支持此类用例。
  • @nick78 在你的情况下,我想你可以在你的聚合器/谓词实例中保留最后一个键。
  • 谢谢大流士。我决定使用 Miloš 解决方案。这对我来说很好。
猜你喜欢
  • 2014-02-20
  • 2014-01-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-04-16
  • 1970-01-01
相关资源
最近更新 更多