【发布时间】:2020-04-11 05:25:43
【问题描述】:
我对 Kafka 和流媒体相当陌生。我有一个要求,比如每次运行 kafka 生产者和消费者时,我都应该得到生产者生成的唯一消息。
下面是Producer和consumer的基本代码
制片人
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("test", "key", jsonstring)
producer.send(record)
producer.close()
消费者
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
props.put("group.id", "13")
val consumer: KafkaConsumer[String, Map[String,Any]] = new KafkaConsumer[String, Map[String,Any]](props)
consumer.subscribe(util.Arrays.asList("test"))
while (true) {
val record = consumer.poll(1000).asScala
for (data <- record.iterator){
println(data.value())
}
我使用的输入 Json 如下
{
"id":1,
"名称":"foo"
}
现在我面临的问题是每次运行程序时都会得到重复的值。例如,如果我运行代码两次,消费者输出看起来像这样
{
"id":1,
"名称":"foo"
}
{
"id":1,
"名称":"foo"
}
我想要的输出就像我运行程序一样,生产者处理的唯一消息应该是消费并且应该被打印。
我尝试了一些方法,例如将消费者属性的偏移量更改为最新
props.put("auto.offset.reset", "latest")
我也尝试了下面提到的东西,但它对我不起作用 How can I get the LATEST offset of a kafka topic?
您能提出任何替代方案吗?
【问题讨论】:
-
每次通话都使用相同的 group.id 吗?好像你正在改变它
-
@sun007 是的,我正在更改它。实际上,如果我不更改组 ID,我不会在消费者中收到任何消息
标签: scala apache-kafka streaming kafka-consumer-api