【问题标题】:Kafka Streams timeseries aggregationKafka Streams 时间序列聚合
【发布时间】:2017-12-13 15:18:51
【问题描述】:

我正在使用 Kafka Streams 来处理时间序列数据。一个用例是每小时汇总每个传感器的数据(传感器 ID 是主题 test 中的消息键)。

我编写了一个管道,它按键(传感器 ID)分组,然后计算每小时的读数。

问题是test 主题中有一些重复的消息(相同的传感器 ID 和时间戳)。我只想考虑最新消息。

Streams DSL API 中有什么东西可以完成这个吗?

  meterDataStream
   .groupByKey()
   .count(
     TimeWindows
       .of(TimeUnit.HOURS.toMillis(1))
       .until(TimeUnit.HOURS.toMillis(1)), 
     "counts")
   .foreach((key, value) => {
     val start = epochMillistoDate(key.window().start())
     val end   = epochMillistoDate(key.window().end())
     logger.info(s"$start - $end\t->$value")
   })

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    您需要为此构建自己的重复数据删除运算符。

    meterDateStream
        .transform(/*write your own deduplicator*/)
        .groupByKey()....
    

    重复数据删除器(即Transformer)必须具有附加的状态存储,并且您可能需要检查标点符号。查看文档了解更多详情:

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-10-15
      • 2018-10-05
      • 1970-01-01
      • 1970-01-01
      • 2018-05-15
      • 1970-01-01
      • 2019-10-14
      • 1970-01-01
      相关资源
      最近更新 更多