【问题标题】:Kafka scala Consumer code - to print consumed recordsKafka scala消费者代码 - 打印消费记录
【发布时间】:2021-06-25 15:36:04
【问题描述】:

因为我正在使用 url 创建简单的 kafka 消费者,如下所示:https://gist.github.com/akhil/6dfda8a04e33eff91a20

在该链接中,为了打印消费记录,使用了一个未标识的单词“asScala”。请告诉我,如何迭代返回类型: ConsumerRecord[String,String] ,即 poll() 方法的返回类型。

import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}

 
object KafkaConsumerEx extends App {

  val topic_name = "newtopic55"
  val consumer_group = "KafkaConsumerBatch"

  val prot = new Properties()
  prot.put("bootstrap.servers","localhost:9092")
  prot.put("group.id",consumer_group)
  prot.put("key.deserializer",  "org.apache.kafka.common.serialization.StringDeserializer")
  prot.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

  val kfk_consumer = new KafkaConsumer[String,String](prot)
  kfk_consumer.subscribe(util.Collections.singleton(topic_name))
  println("here")

   while(true){
    val consumer_record : ConsumerRecords[String, String]  = kfk_consumer.poll(100)
    println("records count : " + consumer_record.count())
    println("records partitions: " + consumer_record.partitions())
    consumer_record.iterator().


  }

}

感谢您的建议。

【问题讨论】:

  • 你试过类似for (record <- consumer_record.iterator()){ // do stuff with record }的东西吗?
  • 是的。我尝试了不同的方法。每次出现编译错误:错误:(29, 44) value foreach is not a member of java.util.Iterator[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] for(record
  • 添加import scala.collection.JavaConversions._
  • 运气不好,已经试过了。

标签: scala kafka-consumer-api


【解决方案1】:

你可以轻松做到这一点

for (record <- consumer_record.iterator()) {
  println(s"Here's your $record")
}

记得添加这个导入:

import scala.collection.JavaConversions._

【讨论】:

    【解决方案2】:

    添加另一个答案,因为scala.collection.JavaConversions 已被弃用,如here 所述。

    至于这个问题,代码可能是这样的

    import scala.collection.JavaConverters._
    
    for (record <- asScalaIterator(consumer_record.iterator)) {
      println(s"Here's your $record")
    }
    

    【讨论】:

      【解决方案3】:
      while(true){
          val consumer_records = kfk_consumer.poll(100)
          val record_iter=consumer_record.iterator()
          while(record_iter.hasNext())
          {
             record=record_iter.next()
             println("records partitions: " + record.partition()
                     "records_data:" + record.value())
          }
      }
             
      

      【讨论】:

      • 你能解释一下吗?你在这里做什么?这与接受的答案有何不同,您为什么更喜欢这种方法?
      猜你喜欢
      • 2019-11-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-09-23
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多