【问题标题】:Spark streaming and kafka Missing required configuration "partition.assignment.strategy" which has no default valueSpark流和kafka缺少所需的配置“partition.assignment.strategy”,没有默认值
【发布时间】:2019-08-04 07:55:00
【问题描述】:

我正在尝试使用 yarn 与 Kafka 一起运行 spark 流应用程序。我收到以下堆栈跟踪错误-

原因:org.apache.kafka.common.config.ConfigException:缺少所需的配置“partition.assignment.strategy”,没有默认值。 在 org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) 在 org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:48) 在 org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:194) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:380) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:363) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:350) 在 org.apache.spark.streaming.kafka010.CachedKafkaConsumer.(CachedKafkaConsumer.scala:45) 在 org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:194) 在 org.apache.spark.streaming.kafka010.KafkaRDDIterator.(KafkaRDD.scala:252) 在 org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:212) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在 org.apache.spark.scheduler.Task.run(Task.scala:109) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

这是我如何使用 spark 流创建 KafkaStream 的代码的 sn-p-

        val ssc = new StreamingContext(sc, Seconds(60))

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "*boorstrap_url:port*",
  "security.protocol" -> "SASL_PLAINTEXT",
  "sasl.kerberos.service.name" -> "kafka",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "annotation-test",
  //Tried commenting and uncommenting this property      
  //"partition.assignment.strategy"->"org.apache.kafka.clients.consumer.RangeAssignor",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean))

val topics = Array("*topic-name*")

val kafkaStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams))
val valueKafka = kafkaStream.map(record => record.value())

我已经浏览了以下帖子 -

  1. https://issues.apache.org/jira/browse/KAFKA-4547
  2. Pyspark Structured Streaming Kafka configuration error

据此,我已将我的 fat jar 中的 kafka util jar 更新为 0.10.2.0 版本,从默认打包自 spark-stream-kafka- 的 0.10.1.0 jar 作为瞬态依赖。当我通过将master设置为本地来在单个节点上运行它时,我的工作也可以正常工作。我正在运行 spark 2.3.1 版本。

【问题讨论】:

  • 您可以尝试将您的策略​​更改为“org.apache.kafka.clients.consumer.RoundRobinAssignor”还是代替“partition.assignment.strategy”尝试设置“consumer.partition.assignment.strategy”
  • 我试过了,然后我得到了错误 - java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
  • 尝试设置“consumer.partition.assignment.strategy”而不是“partition.assignment.strategy”。
  • 我遇到了同样的错误——缺少所需的配置“partition.assignment.strategy”,它没有默认值。
  • 我认为这个问题可能有用:stackoverflow.com/questions/43035542/…

标签: apache-spark apache-kafka spark-streaming spark-streaming-kafka


【解决方案1】:

kafka-clients-*.jar 添加到您的 spark jar 文件夹中。 kafka-clients-*.jarkafka-*/lib 目录中。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-10-13
    • 1970-01-01
    • 2017-11-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-23
    • 1970-01-01
    相关资源
    最近更新 更多