【问题标题】:How to resolve Kafka Consumer poll timeout error如何解决 Kafka Consumer 轮询超时错误
【发布时间】:2019-10-25 07:37:53
【问题描述】:

我正在尝试通过 vagrant 机器使用 Apache Kafka 来运行一个简单的 Kafka Consumer 程序。程序在尝试调用 .poll(100) 方法时卡在 for 循环之前。

为了调试而深入挖掘了很多更深的类,但发现的并不多。

val TOPIC="testTopic"

val  props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.10:9092")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

val consumer = new KafkaConsumer[String, String](props)

consumer.subscribe(util.Collections.singletonList(TOPIC))

while(true) {
  println("Test")
  val records = consumer.poll(100)
  for (record <- records.asScala) {
    println(record)
  }
  println("Test2")
}

}

当前输出测试,然后卡住没有错误消息。预计会输出Kafka topic的内容。

【问题讨论】:

  • 你能尝试用consumer.poll(new Duration(100, TimeUnit.MILLISECONDS))代替consumer.poll(100)吗?
  • @AlekseyIsachenkov 你是对的,但我只是补充说它需要更改 Kafka 客户端版本。

标签: scala apache-kafka vagrant kafka-consumer-api


【解决方案1】:

您需要升级您的 kafka-clients 版本到 2.0.0 或更高版本。例如,当 kafka 服务器关闭时,使用 KafkaConsumer 类中的 poll 方法,您将陷入内部循环,等待代理再次可用。

根据KIP-266

消费者记录

轮询(长时间超时)

已弃用。从 2.0 开始。 使用 poll(Duration),不阻塞 超出等待分区分配的超时时间。有关更多信息,请参见 KIP-266 信息。

在你的情况下:

import org.apache.kafka.clients.consumer.KafkaConsumer; 
import scala.concurrent.duration._

// ...
val timeout = Duration(100, MILLISECONDS) 

while(true) {
  println("Test")
  val records = consumer.poll(timeout)
  for (record <- records.asScala) {
    println(record)
  }
  println("Test2")
}

//...

总之,你只需要导入新版本的KafkaConsumer类,并将超时参数作为Duration对象的实例传递给新的poll方法即可。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-11-23
    • 2011-10-05
    • 1970-01-01
    • 1970-01-01
    • 2021-09-09
    相关资源
    最近更新 更多