【问题标题】:How to get the latest value from a kafka Stream如何从 kafka 流中获取最新值
【发布时间】:2020-04-11 05:25:43
【问题描述】:

我对 Kafka 和流媒体相当陌生。我有一个要求,比如每次运行 kafka 生产者和消费者时,我都应该得到生产者生成的唯一消息。

下面是Producer和consumer的基本代码

制片人

 val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String]("test", "key", jsonstring)
    producer.send(record)
    producer.close()

消费者

val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "earliest")
    props.put("group.id", "13")
    val consumer: KafkaConsumer[String, Map[String,Any]] = new KafkaConsumer[String, Map[String,Any]](props)
    consumer.subscribe(util.Arrays.asList("test"))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator){
        println(data.value())

      }

我使用的输入 Json 如下

{

"id":1,

"名称":"foo"

}

现在我面临的问题是每次运行程序时都会得到重复的值。例如,如果我运行代码两次,消费者输出看起来像这样

{

"id":1,

"名称":"foo"

}

{

"id":1,

"名称":"foo"

}

我想要的输出就像我运行程序一样,生产者处理的唯一消息应该是消费并且应该被打印。

我尝试了一些方法,例如将消费者属性的偏移量更改为最新

props.put("auto.offset.reset", "latest")

我也尝试了下面提到的东西,但它对我不起作用 How can I get the LATEST offset of a kafka topic?

您能提出任何替代方案吗?

【问题讨论】:

  • 每次通话都使用相同的 group.id 吗?好像你正在改变它
  • @sun007 是的,我正在更改它。实际上,如果我不更改组 ID,我不会在消费者中收到任何消息

标签: scala apache-kafka streaming kafka-consumer-api


【解决方案1】:

消费者按顺序从主题分区读取消息。 如果调用 poll(),它会返回写入到 Kafka 的记录,但我们组中的消费者尚未阅读。 Kafka 在每个分区上跟踪它们的消费偏移量,以了解在重启的情况下从哪里开始消费。 消费者通过提交在主题 __consumer_offsets 中维护其分区偏移量。

Commit 是更新当前位置的动作 __consumer_offsets。

如果消费者重新启动,为了知道从哪里开始消费,消费者会读取每个分区的最新提交的偏移量并从那里继续。

您可以通过两种方式控制提交,将自动提交设置为 true 和提交间隔

1.通过 enable.auto.commit true

props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

2.手动提交

consumer.commitAsync();//asyn commit
or  
consumer.commitSync();//sync commit

如果您未能提交,它将从最后提交的位置重新启动,如下图所示

auto.offset.reset:

消费者第一次重新启动后,它会使用 auto.offset.reset 来确定每个分配分区的初始位置。请注意,当第一次使用唯一组 id 创建组时,在消费任何消息之前,根据可配置的偏移重置策略设置位置 (auto.offset.reset)时间>。之后,它将继续增量消费消息并使用提交(如上所述)跟踪最新的消费消息

注意:如果消费者在提交任何偏移量之前崩溃, 那么接管其分区的消费者将使用重置 政策。

所以你的情况

  1. 使用手动偏移提交或 enable.auto.commit true 进行自动提交。
  2. 如果更改组会处理不同的消费者并使用 auto.offset.reset 分配偏移量,请始终使用相同的组 ID。

参考:https://www.confluent.io/resources/kafka-the-definitive-guide/

【讨论】:

猜你喜欢
  • 2018-05-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-01-25
  • 2017-12-12
  • 1970-01-01
  • 2022-12-19
  • 1970-01-01
相关资源
最近更新 更多