【问题标题】:Get kafka record timestamp from kafka message从kafka消息中获取kafka记录时间戳
【发布时间】:2020-07-16 21:43:20
【问题描述】:

我想要消息被生产者插入 kafka 主题的时间戳。

在 kafka 消费者方面,我想提取那个时间戳。


class Producer {
  def main(args: Array[String]): Unit = {
    writeToKafka("quick-start")
  }
  def writeToKafka(topic: String): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9094")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String](topic, "key", "value")
    producer.send(record)
    producer.close()
  }
}



class Consumer {
  def main(args: Array[String]): Unit = {
    consumeFromKafka("quick-start")
  }
  def consumeFromKafka(topic: String) = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9094")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator)
        println(data.value())
    }
  }
}

kafka 提供了一种方法吗?否则我将不得不从生产者向主题发送一个额外的字段。

【问题讨论】:

  • data.timestamp 应该给你时间戳。但是,我不完全确定现在是什么时间......生产时间,在经纪人中存储的时间,确认成功生产的时间......
  • 是的,我认为迈克是对的。应该有时间戳。它可能在 headers.timestamp 下。不确定你的结构。我猜 data.headers.timestamp 但我不确定

标签: scala apache-kafka


【解决方案1】:

Kafka 从 v0.10 开始提供了一种方式

从那个版本开始,您的所有消息在data.timestamp 中都有一个可用的时间戳信息,其中的信息类型由您的代理上的配置“message.timestamp.type”决定。该值应为CreateTimeLogAppendTime

在此版本之前,您必须手动实现它,通常是通过修改数据结构。

【讨论】:

  • 需要注意的是,时间戳是生产者在ProducerRecord创建时插入的:它可以是生产系统的本地纪元时间(默认),也可以是由应用程序定制为更具体的领域(例如,如果从多个来源获得的数据合成记录,它可能是源数据的最新时间戳)。
猜你喜欢
  • 2020-12-21
  • 1970-01-01
  • 2020-11-18
  • 2019-04-18
  • 1970-01-01
  • 2017-03-27
  • 2021-09-22
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多