【问题标题】:apache spark streaming running python exampleapache spark流运行python示例
【发布时间】:2018-03-02 01:35:53
【问题描述】:

我正在尝试运行示例目录中给出的 python spark 流式传输作业 -

https://spark.apache.org/docs/2.1.1/streaming-programming-guide.html

"""
 Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
 Usage: kafka_wordcount.py <zk> <topic>
 To run this on your local machine, you need to setup Kafka and create a producer first, see
 http://kafka.apache.org/documentation.html#quickstart

 and then run the example
    `$ bin/spark-submit --jars \
      external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \
      examples/src/main/python/streaming/kafka_wordcount.py \
      localhost:2181 test`
"""
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    # counts.pprint()

    ssc.start()
    ssc.awaitTermination()

我将 spark-streaming-kafka-0-8_2.11-2.1.0.jar 下载到我的本地目录并运行我的 spark-submit 命令

bin/spark-submit --jars ../external/spark-streaming-kafka*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test

我收到以下错误 -

Exception in thread "Thread-3" java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition

【问题讨论】:

    标签: python apache-spark pyspark spark-streaming


    【解决方案1】:

    您需要使用spark-streaming-kafka-assembly jar,而不是spark-streaming-kafka。程序集 jar 包含所有依赖项(包括 kafka 客户端)。

    【讨论】:

    • 这个有效 -- bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 examples/src/main/python/ streaming/kafka_wordcount.py localhost:2181 测试,但是当我下载 jar 并像这样运行(使用 --jar)时出现错误 - bin/spark-submit --jars ../external/spark-streaming-kafka-0 -8_2.11-2.2.0.jar 示例/src/main/python/streaming/kafka_wordcount.py localhost:2181 测试
    • 这是因为--packages 下载所有传递依赖项。如果包含带有 --jars 的包,则需要提供所有必需的类 - 使用 assembly jar。
    猜你喜欢
    • 2015-03-20
    • 1970-01-01
    • 1970-01-01
    • 2018-02-18
    • 2016-07-12
    • 1970-01-01
    • 2020-08-15
    • 1970-01-01
    • 2014-07-26
    相关资源
    最近更新 更多