【发布时间】:2020-07-16 21:43:20
【问题描述】:
我想要消息被生产者插入 kafka 主题的时间戳。
在 kafka 消费者方面,我想提取那个时间戳。
class Producer {
def main(args: Array[String]): Unit = {
writeToKafka("quick-start")
}
def writeToKafka(topic: String): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9094")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String](topic, "key", "value")
producer.send(record)
producer.close()
}
}
class Consumer {
def main(args: Array[String]): Unit = {
consumeFromKafka("quick-start")
}
def consumeFromKafka(topic: String) = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9094")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
while (true) {
val record = consumer.poll(1000).asScala
for (data <- record.iterator)
println(data.value())
}
}
}
kafka 提供了一种方法吗?否则我将不得不从生产者向主题发送一个额外的字段。
【问题讨论】:
-
data.timestamp应该给你时间戳。但是,我不完全确定现在是什么时间......生产时间,在经纪人中存储的时间,确认成功生产的时间...... -
是的,我认为迈克是对的。应该有时间戳。它可能在 headers.timestamp 下。不确定你的结构。我猜 data.headers.timestamp 但我不确定
标签: scala apache-kafka