【问题标题】:I am using Kafka Producer Api to write messages from a file into kafka topic, but the logs of kafka topic is showing empty?我正在使用 Kafka Producer Api 将文件中的消息写入 kafka 主题,但 kafka 主题的日志显示为空?
【发布时间】:2019-04-01 13:43:01
【问题描述】:

我正在使用以下 Producer API 代码将消息写入 Kafka 主题,但无法将消息写入主题:

import java.util.Properties
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import scala.io.Source

object KafkaProducerDemo {

    def main(args: Array[String]): Unit = {

        val props = new Properties()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

        val producer = new KafkaProducer[Nothing, String](props)


        val logMessages = Source.
          fromFile("/opt/gen_logs/logs/access.log").
          getLines.
          toList

        logMessages.foreach(message => {
          val record = new ProducerRecord("retail-multi", message)
          producer.send(record)
        })
    }
}

【问题讨论】:

  • 任何异常抛出?
  • 是的,我在 Kafka Broker 遇到了异常。错误处理器有未捕获的异常。 (kafka.network.Processor) java.lang.ArrayIndexOutOfBoundsException: 18 at org.apache.kafka.common.protocol.Api Keys.forId(Api Keys.java:68)
  • -rw-r--r-- 1 cloudera cloudera 634770 2015 年 11 月 20 日 kafka-clients-0.9.0.0.jar -rw-r--r-- 1 cloudera cloudera 4960620 2015 年 11 月 20 日 kafka_2 .10-0.9.0.0.jar -rw-r--r-- 1 cloudera cloudera 48565 2015 年 11 月 20 日 kafka_2.10-0.9.0.0-javadoc.jar
  • -rw-r--r-- 1 cloudera cloudera 621175 2015 年 11 月 20 日 kafka_2.10-0.9.0.0-sources.jar -rw-r--r-- 1 cloudera cloudera 2802225 11 月 20 日2015 kafka_2.10-0.9.0.0-scaladoc.jar -rw-r--r-- 1 cloudera cloudera 2049056 2015 年 11 月 20 日 kafka_2.10-0.9.0.0-test.jar -rw-r--r-- 1 cloudera cloudera 821 2015 年 11 月 20 日 kafka_2.10-0.9.0.0-test.jar.asc -rw-r--r-- 1 cloudera cloudera 821 2015 年 11 月 20 日 kafka_2.10-0.9.0.0-sources.jar.asc -rw- r--r-- 1 cloudera cloudera 821 2015 年 11 月 20 日 kafka_2.10-0.9.0.0-scaladoc.jar.asc
  • -rw-r--r-- 1 cloudera cloudera 821 2015 年 11 月 20 日 kafka_2.10-0.9.0.0-javadoc.jar.asc -rw-r--r-- 1 cloudera cloudera 821 2015 年 11 月 20 日 kafka_2.10-0.9.0.0.jar.asc

标签: apache-kafka kafka-producer-api


【解决方案1】:

根据您在 cmets (java.lang.ArrayIndexOutOfBoundsException: 18) 中提到的错误,我会说您的客户端库版本和代理版本不匹配。客户端库应该是

因此,请仔细检查您要连接的代理版本,然后再仔细检查您的客户端库版本。一旦它们匹配或兼容,您就可以开始使用了!

【讨论】:

  • 感谢您的回答,因为我是新手,您可以建议如何检查。我已经检查了库中的 jar 文件,它看起来不错,因为服务器和客户端 jar 的版本相同。
  • ERROR 处理器未捕获异常。 (kafka.network.Processor) java.lang.ArrayIndexOutOfBoundsException: 18 at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68) at org.apache.kafka.common.requests.AbstractRequest.getRequest( AbstractRequest.java:39) 在 kafka.network.RequestChannel$Request.(RequestChannel.scala:79) 在
  • kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421) at scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) at scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157) at scala.collection.IterableLike$class.foreach(Ite​​rableLike.scala:72) at scala .collection.AbstractIterable.foreach(Ite​​rable.scala:54) at kafka.network.Processor.run(SocketServer.scala:421) at java.lang.Thread.run(Thread.java:745)
  • 您好,我也提到了库版本和完整的 Staketrace。
【解决方案2】:

嗨,我这可能是因为一些 kafka 版本不匹配。我已经重新安装了 kafka 和 sbt。现在它开始正常工作了。

【讨论】:

    猜你喜欢
    • 2019-04-23
    • 2018-10-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-18
    • 2020-09-14
    • 2018-09-30
    相关资源
    最近更新 更多