【发布时间】:2021-06-25 15:36:04
【问题描述】:
因为我正在使用 url 创建简单的 kafka 消费者,如下所示:https://gist.github.com/akhil/6dfda8a04e33eff91a20。
在该链接中,为了打印消费记录,使用了一个未标识的单词“asScala”。请告诉我,如何迭代返回类型: ConsumerRecord[String,String] ,即 poll() 方法的返回类型。
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
object KafkaConsumerEx extends App {
val topic_name = "newtopic55"
val consumer_group = "KafkaConsumerBatch"
val prot = new Properties()
prot.put("bootstrap.servers","localhost:9092")
prot.put("group.id",consumer_group)
prot.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prot.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
val kfk_consumer = new KafkaConsumer[String,String](prot)
kfk_consumer.subscribe(util.Collections.singleton(topic_name))
println("here")
while(true){
val consumer_record : ConsumerRecords[String, String] = kfk_consumer.poll(100)
println("records count : " + consumer_record.count())
println("records partitions: " + consumer_record.partitions())
consumer_record.iterator().
}
}
感谢您的建议。
【问题讨论】:
-
你试过类似
for (record <- consumer_record.iterator()){ // do stuff with record }的东西吗? -
是的。我尝试了不同的方法。每次出现编译错误:错误:(29, 44) value foreach is not a member of java.util.Iterator[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] for(record
-
添加
import scala.collection.JavaConversions._ -
运气不好,已经试过了。