【问题标题】:Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer线程“主”org.apache.kafka.common.KafkaException 中的异常:无法构造 kafka 消费者
【发布时间】:2020-04-08 11:00:02
【问题描述】:

当我尝试使用 Scala 将融合的 Kafka 集群与简单的消费者代码连接时。但是连接失败,抛出异常“

线程“主”org.apache.kafka.common.KafkaException 中的异常: 构造Kafka消费者失败”

但是我可以在其他机器上连接同一个集群。我在堆栈中引用了类似的帖子,但没有任何内容是我的方式。


    import java.util.Collections

    import org.apache.kafka.clients.consumer.KafkaConsumer

    import scala.collection.JavaConverters._


    object kafkaconnect {

        def main(args: Array[String]): Unit = {
          import java.util.Properties

          val TOPIC="test"
          val  props = new Properties()
          props.put("bootstrap.servers", "conflunetcluster:9092")
          props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
          props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
          props.put("group.id", "something")
          props.put("enable.auto.commit", "true")
          props.put("auto.commit.interval.ms", "1000")
          props.put("session.timeout.ms", "30000")
          props.put("enable.partition.eof", "false")
          println(props)
          println("beforeconsume")
          val consumer = new KafkaConsumer[String, String](props)
          println(consumer)
          consumer.subscribe(Collections.singletonList(TOPIC))
          consumer.listTopics()
          println("2")
          while(true){

                    val records=consumer.poll(100)
            for (record<-records.asScala){
              println(record)
            }
          }

      }

    }


 Build.SBT
  name := "kafkatest"
  version := "0.1"
  scalaVersion := "2.11.12"
  libraryDependencies +=  "org.apache.kafka" % "kafka-clients" % "1.0.0"

还请找到堆栈跟踪

"C:\Program Files\Java\jdk1.8.0_192\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA Educational Edition 2018.3.1\lib\idea_rt.jar=59817:C:\Program Files\JetBrains\IntelliJ IDEA Educational Edition 2018.3.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_192\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_192\jre\lib\rt.jar;C:\Users\756661\IdeaProjects\kafkatest\target\scala-2.11\classes;C:\Users\756661\.ivy2\cache\org.scala-lang\scala-library\jars\scala-library-2.11.12.jar;C:\Users\756661\.ivy2\cache\org.slf4j\slf4j-api\jars\slf4j-api-1.7.25.jar;C:\Users\756661\.ivy2\cache\org.apache.kafka\kafka-clients\jars\kafka-clients-1.0.0.jar;C:\Users\756661\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.jar;C:\Users\756661\.ivy2\cache\org.xerial.snappy\snappy-java\bundles\snappy-java-1.1.4.jar" kafkaconnect
{key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, auto.commit.interval.ms=1000, bootstrap.servers=ashaplq00005:9092, enable.partition.eof=false, enable.auto.commit=true, group.id=something, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, session.timeout.ms=30000}
beforeconsume
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)
    at kafkaconnect$.main(kafkaconnect.scala:29)
    at kafkaconnect.main(kafkaconnect.scala)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:64)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:698)
    ... 4 more

Process finished with exit code 1

【问题讨论】:

  • 你能发布堆栈跟踪吗?应该有一个嵌套异常。
  • 嗨 Manoj,谢谢,请找到踪迹。
  • bootstrap.servers 中没有提供可解析的引导 URL:似乎是 DNS 问题。您可以尝试从您的机器 ping 主机名(conflunetcluster)吗?
  • 当然,Manoj,我使用的是企业 LAN,我需要使用适当的票证与本地 IT 团队处理它。我会更新相同的
  • 注意:理想情况下,消费者应该在后台线程上运行。 Java中的示例github.com/omkreddy/kafka-examples/blob/master/consumer/src/…

标签: scala apache-kafka kafka-consumer-api


【解决方案1】:

从堆栈跟踪您的消费者无法连接到conflunetcluster:9092

是不是打错字了?应该是confluentcluster:9092

首先尝试使用命令行工具连接到您的集群(例如bin/kafka-topics.sh --list --bootstrap-server localhost:9092

【讨论】:

  • 问题出在我的 DNS 上,一旦我与网络团队核实,他们已经更改了 DNS 和代理值,并且工作正常感谢 Manoj Vadehra 和 pgras
猜你喜欢
  • 2018-06-06
  • 1970-01-01
  • 2020-05-23
  • 2016-06-10
  • 1970-01-01
  • 2023-04-08
  • 1970-01-01
  • 1970-01-01
  • 2017-09-19
相关资源
最近更新 更多