【问题标题】:Get Latest value from Kafka从 Kafka 获取最新值
【发布时间】:2018-05-29 18:02:31
【问题描述】:

我有一个名为 A 的 Kafka 主题。

主题A中的数据格式为:

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 2, name:confluent, created_at:2017-09-28 22:00:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}
{ id : 4, name:apache, created_at:2017-09-28 24:41:00.000}

现在在消费者方面,我只想获取一小时窗口的最新数据意味着每隔一小时我需要根据 created_at 从主题获取最新值

我的预期输出是:

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}

我认为这可以通过 ksql 解决,但我不确定。请帮帮我。

提前致谢。

【问题讨论】:

  • 你的钥匙是什么?
  • 认为key可能是message 1、message 2等等......上面提到的是values
  • 酷。只是想确保您牢记密钥,因为 Kafka Streams 根据密钥聚合/分组/执行所有操作。

标签: apache-kafka apache-kafka-streams ksqldb


【解决方案1】:

是的,您可以为此使用 KSQL。请尝试以下操作:

CREATE STREAM S1 (id BIGINT, name VARCHAR, created_at VARCHAT) WITH (kafka_topic = 'topic_name', value_format = 'JSON');

CREATE TABLE maxRow AS SELECT id, name, max(STRINGTOTIMESTAMP(created_at, 'yyyy-mm-dd hh:mm:ss.SSS')) AS creted_at FROM s1 WINDOW TUMBLING (size 1 hour) GROUP BY id, name;

结果将具有 linux 时间戳格式的 created_at 时间。您可以在新查询中使用 TIMESTAMPTOSTRING udf 将其更改为所需的格式。 如果您发现任何问题,请告诉我。

【讨论】:

  • 感谢您的回复,我可以将 1 小时的窗口也缩短到 10 分钟吗,这会导致任何性能问题吗?
  • 当然,您可以使用(size 10 minutes)。它不应该有任何重大的性能问题。
  • 感谢您的回复,还有一个问题,ksql 是把数据存储在内存还是磁盘中?
  • 内部状态存储使用RocksDB并将状态存储在内存中。查询的结果将写入 kafka 主题中,当然是在磁盘上!
  • @matthias-j-sax 是否可以使用 KTable 实现这一点?如果是的话,有什么例子吗?
猜你喜欢
  • 2020-04-11
  • 1970-01-01
  • 2019-12-20
  • 2020-07-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-12-12
相关资源
最近更新 更多