【发布时间】:2018-02-21 09:38:13
【问题描述】:
我之前能够运行 Kafka 结构流式编程。但突然间,我所有的结构流式 python 程序都因错误而失败。我从 Spark 网站上获取了基本的 Kafka 结构流式编程,但也因同样的错误而失败。
py4j.protocol.Py4JJavaError:调用 o31.load 时出错。 : java.lang.NoClassDefFoundError: org/apache/kafka/common/序列化/ByteArrayDeserializer 在 org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:376) 在 org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)
我正在使用的 Spark 提交
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 C:\Users\ranjith.gangam\PycharmProjects\sparktest\Structured_streaming.py
这是我从 Spark github 获取的代码
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.selectExpr("CAST(value AS STRING)")
words = lines.select(
# explode turns each item in an array into a separate row
explode(
split(lines.value, ' ')
).alias('word')
)
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the console
query = wordCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.start()
query.awaitTermination()
【问题讨论】:
标签: apache-spark pyspark apache-kafka spark-structured-streaming