【发布时间】:2017-12-13 15:18:51
【问题描述】:
我正在使用 Kafka Streams 来处理时间序列数据。一个用例是每小时汇总每个传感器的数据(传感器 ID 是主题 test 中的消息键)。
我编写了一个管道,它按键(传感器 ID)分组,然后计算每小时的读数。
问题是test 主题中有一些重复的消息(相同的传感器 ID 和时间戳)。我只想考虑最新消息。
Streams DSL API 中有什么东西可以完成这个吗?
meterDataStream
.groupByKey()
.count(
TimeWindows
.of(TimeUnit.HOURS.toMillis(1))
.until(TimeUnit.HOURS.toMillis(1)),
"counts")
.foreach((key, value) => {
val start = epochMillistoDate(key.window().start())
val end = epochMillistoDate(key.window().end())
logger.info(s"$start - $end\t->$value")
})
【问题讨论】:
标签: apache-kafka apache-kafka-streams