【发布时间】:2018-09-21 06:52:54
【问题描述】:
我在 Kafka 主题“原始数据”中获取 CSV,目标是通过发送具有正确时间戳(每行不同)的另一个主题“数据”中的每一行来转换它们。
目前,我有 2 个主播:
- 用于拆分“原始数据”中的行,将它们发送到“内部”主题(无时间戳)
- 带有
TimestampExtractor的一个消耗“内部”并将它们发送到“数据”。
我想通过直接设置时间戳来删除这个“内部”主题的使用,但我找不到方法(时间戳提取器仅在消费时使用)。
我在文档中偶然发现了这一行:
请注意,可以在处理器 API 中更改描述默认行为,方法是在调用 #forward() 时为输出记录显式分配时间戳。
但我找不到任何带有时间戳的签名。它们是什么意思?
你会怎么做?
编辑: 需要明确的是,我有一个 Kafka 主题,其中包含一条消息,其中包含事件时间和一些值,例如:
2018-01-01,hello
2018-01-02,world
(这是一条消息,而不是两条)
我想在另一个主题中获取两条消息,并将 Kafka 记录时间戳设置为它们的事件时间(2018-01-01 和 2018-01-02),而不需要中间主题。
【问题讨论】:
-
它需要是 Kafka Streams,还是您对 KSQL 的示例感兴趣?
-
我只使用 Kafka Streams。如果在 KSQL 中是可能的,那就意味着在 Kafka Streams 中有一种方法。
-
没关系,我会留给 Kafka Streams 专家来回答 :)
标签: apache-kafka apache-kafka-streams