【发布时间】:2017-01-23 15:51:00
【问题描述】:
如何从 Kafka 集群获取指定日期的消息或数据。例如 9 月 13 日,谁能提供我的代码。我用谷歌搜索了它,只找到了理论,但我想要代码
【问题讨论】:
标签: apache-kafka
如何从 Kafka 集群获取指定日期的消息或数据。例如 9 月 13 日,谁能提供我的代码。我用谷歌搜索了它,只找到了理论,但我想要代码
【问题讨论】:
标签: apache-kafka
将此添加到当前命令 --property print.timestamp=true 这将打印时间戳 CreateTime:1609917197764。
示例: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --property print.timestamp=true --from-beginning
【讨论】:
从 kafka 获取特定日期的数据效率不高,因为数据线性存储在每个代理存储系统上的 kafka 内。因此,即使您在每条消息中都有时间戳或使用 kafka 的消息元数据(可能包含更高版本的 kafka 消息版本(>=0.10)中的时间戳),您仍然必须扫描每个分区上的整个主题以获取数据。由于 kafka 内部的数据不是按日期索引的,而是按偏移量索引的。
请记住,kafka 是一个队列,而不是数据库。如果您想要这种基于日期的检索模式,您可能需要考虑将 kafka 消息存储在另一个合适的数据库系统中,并使用时间戳作为索引。
【讨论】:
对此没有访问方法。另外,在 Kafka 之前,v0.10 消息不包含任何时间戳信息,因此无法知道消息何时写入主题。
从 Kafka v0.10 开始,每条消息都包含一个元数据时间戳属性,该属性要么由生产者在消息创建时设置,要么由代理在消息插入时设置。计划了基于时间的索引,但尚不可用。因此,您需要使用整个主题并检查时间戳字段(并忽略所有您不感兴趣的消息)。要找到开头,您还可以对偏移量和时间戳进行二进制搜索,以更快地找到第一条消息。
更新:
Kakfa 0.10.1 添加基于时间的索引。它允许seek 到时间戳等于或大于给定时间戳的第一条记录。您可以通过KafkaConsumer#offsetsForTime() 使用它。这将返回相应的偏移量,您可以将它们输入KafkaConsumer#seek()。您可以使用数据并通过ConsumerRecord#timestamp() 检查记录时间戳字段,以查看何时可以停止处理。
请注意,数据严格按偏移量排序,而不是按时间戳排序。因此,在处理过程中,您可能会得到时间戳比开始时间戳更小的“迟到”记录(不过,您可以简单地跳过这些记录)。
一个更困难的问题是在您的搜索间隔结束时迟到的记录。在您获得第一个时间戳比您的搜索间隔更大的时间戳后,稍后可能仍然有时间戳记录属于您的搜索间隔的一部分(如果这些记录确实附加到主题“迟到”)。但是,没有办法知道这一点。因此,您可能希望继续阅读“更多”记录并检查是否有“迟到”记录。 “一些记录”的含义是多少,是需要您自己做出的设计决定。
虽然没有一般的指导方针——如果您对“写入模式”有更多的了解,它可以帮助您定义一个好的策略,以便在您的搜索间隔“结束”后要消费多少条记录。当然有两种默认策略:(1)在时间戳大于搜索间隔的第一条记录处停止(并有效地忽略任何迟到的记录——如果您使用“日志附加时间”配置,这当然是一个安全的策略); (2)你读到日志的末尾——这是关于完整性的最安全的策略,但可能会导致过高的开销(另请注意,因为记录可以随时附加,如果记录“延迟”可能是任意大的,到达日志结束后,甚至可能会附加一条迟到的记录)。
在实践中,考虑“最大预期延迟”可能是个好主意,并一直阅读,直到您获得时间戳大于此延迟上限的记录。
【讨论】:
ProducerRecord 的构造函数有多个重载;有些接受时间戳参数(长类型)。对于broker端时间戳,需要更改对应的topic config message.timestamp.type cf kafka.apache.org/documentation/#topicconfigs
TimestampExtractor 用于获取输入记录的时间戳。在写入时,记录时间戳将由流明确设置。 Atm,它没有记录 Kafka Streams 如何计算和设置输出记录的时间戳(我们计划很快缩小这个差距)。请注意,从 2.0 版本开始,您可以在 Kafka Streams 中显式设置记录的时间戳 (cwiki.apache.org/confluence/display/KAFKA/…),如果记录写入输出主题,则会设置此时间戳。
ConsumerRecord#timestamp() 中的时间戳是放在主题上的时间,还是被消费的时间?