【问题标题】:Is there a way to apply filters on a count function in Kafka streams?有没有办法在 Kafka 流中的计数函数上应用过滤器?
【发布时间】:2020-06-10 06:00:46
【问题描述】:

我的用例如下 - 我有一个包含特定 ID 的消息的主题。 我创建了一个 Kafka Streams 应用程序,它聚合具有相同 ID 的消息并对它们进行计数(类似于 https://kafka.apache.org/10/documentation/streams/tutorial 中的示例 WordCount 实现)

我希望 Kafka 流仅在超过某个阈值时才向输出主题发送消息。例如,如果我将阈值定义为 10,我希望在流处理了 10 条具有相同 ID 的消息后,将一条消息发送到输出主题。

我知道这可以通过有一个额外的主题和另一个处理该主题的流来完成,但是有没有办法在一个流中做到这一点?

【问题讨论】:

  • 您的“单流”更像是一个 KTable。您在这里需要另一个主题,但“过滤器”是您正在寻找的错误词。试试Punctuator

标签: apache-kafka apache-kafka-streams


【解决方案1】:

使用count()聚合函数将KStream转换为Stream后,可以对count值进行过滤,转换为Stream并发送到特定主题:

.selectKey((k, v) -> v)
        .groupByKey()
        .count()
        .filter((key, count) -> count > 3)
        .toStream()
        .filter((key, count) -> count != null)
        .to("output", Produced.with(Serdes.String(), Serdes.String()));

【讨论】:

  • 您也可以直接通过KTable#filter() 对结果KTable 应用过滤器。顺便说一句:您可能想将自己的答案标记为已接受:)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-02-06
  • 2020-07-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多