【发布时间】:2019-08-04 07:55:00
【问题描述】:
我正在尝试使用 yarn 与 Kafka 一起运行 spark 流应用程序。我收到以下堆栈跟踪错误-
原因:org.apache.kafka.common.config.ConfigException:缺少所需的配置“partition.assignment.strategy”,没有默认值。 在 org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) 在 org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:48) 在 org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:194) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:380) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:363) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:350) 在 org.apache.spark.streaming.kafka010.CachedKafkaConsumer.(CachedKafkaConsumer.scala:45) 在 org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:194) 在 org.apache.spark.streaming.kafka010.KafkaRDDIterator.(KafkaRDD.scala:252) 在 org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:212) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在 org.apache.spark.scheduler.Task.run(Task.scala:109) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
这是我如何使用 spark 流创建 KafkaStream 的代码的 sn-p-
val ssc = new StreamingContext(sc, Seconds(60))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "*boorstrap_url:port*",
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "annotation-test",
//Tried commenting and uncommenting this property
//"partition.assignment.strategy"->"org.apache.kafka.clients.consumer.RangeAssignor",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("*topic-name*")
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val valueKafka = kafkaStream.map(record => record.value())
我已经浏览了以下帖子 -
- https://issues.apache.org/jira/browse/KAFKA-4547
- Pyspark Structured Streaming Kafka configuration error
据此,我已将我的 fat jar 中的 kafka util jar 更新为 0.10.2.0 版本,从默认打包自 spark-stream-kafka- 的 0.10.1.0 jar 作为瞬态依赖。当我通过将master设置为本地来在单个节点上运行它时,我的工作也可以正常工作。我正在运行 spark 2.3.1 版本。
【问题讨论】:
-
您可以尝试将您的策略更改为“org.apache.kafka.clients.consumer.RoundRobinAssignor”还是代替“partition.assignment.strategy”尝试设置“consumer.partition.assignment.strategy”
-
我试过了,然后我得到了错误 - java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
-
尝试设置“consumer.partition.assignment.strategy”而不是“partition.assignment.strategy”。
-
我遇到了同样的错误——缺少所需的配置“partition.assignment.strategy”,它没有默认值。
-
我认为这个问题可能有用:stackoverflow.com/questions/43035542/…
标签: apache-spark apache-kafka spark-streaming spark-streaming-kafka