【发布时间】:2019-10-25 07:37:53
【问题描述】:
我正在尝试通过 vagrant 机器使用 Apache Kafka 来运行一个简单的 Kafka Consumer 程序。程序在尝试调用 .poll(100) 方法时卡在 for 循环之前。
为了调试而深入挖掘了很多更深的类,但发现的并不多。
val TOPIC="testTopic"
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.10:9092")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(TOPIC))
while(true) {
println("Test")
val records = consumer.poll(100)
for (record <- records.asScala) {
println(record)
}
println("Test2")
}
}
当前输出测试,然后卡住没有错误消息。预计会输出Kafka topic的内容。
【问题讨论】:
-
你能尝试用
consumer.poll(new Duration(100, TimeUnit.MILLISECONDS))代替consumer.poll(100)吗? -
@AlekseyIsachenkov 你是对的,但我只是补充说它需要更改 Kafka 客户端版本。
标签: scala apache-kafka vagrant kafka-consumer-api