【问题标题】:py4j.protocol.Py4JavaError: An error occured while calling o22.startpy4j.protocol.Py4JavaError: 调用 o22.start 时出错
【发布时间】:2020-09-10 20:24:01
【问题描述】:

我现在正试图让 SparkStreaming 和 Kafka 在 Ubantu 上协同工作。但问题来了。

我可以确保 Kafka 正常工作。

在第一个终端上:

bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties

在第二个终端上:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsendertest

然后,我创建一些数据:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wordsendertest
hello hadoop
hello spark

在第三个终端:

cd /usr/local/spark/mycode/kafka
/usr/local/spark/bin/spark-submit ./kafkaWordCount.py localhost:2181 wordsendertest

kafkaWordCount.py的代码:

from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys

if __name__ == "__main__":
   if len(sys.argv) != 3:
      print("usage:KafkaWordCount.py<zk><topic>",file=sys.stderr)
      exit(-1)
   sc = SparkContext(appName="PythonStreamingKafkaWordCount")
   ssc = StreamingContext(sc,1)
   zkQuorum,topic = sys.argv[1:]
   kvs = KafkaUtils.createStream(ssc,zkQuorum,"spark-streaming-consumer",{topic:1})
   lines = kvs.map(lambda x:x[1])
   counts = lines.flatMap(lambda x:x.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
   counts.pprint
   ssc.start()
   ssc.awaitTermination()

我的错误:

Traceback (most recent call last):
  File "/usr/local/spark/mycode/kafka/./KafkaWordCount.py", line 20, in <module>
    ssc.start()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 196, in start
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o22.start.
: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

请帮帮我!谢谢!

【问题讨论】:

    标签: python apache-spark pyspark apache-kafka


    【解决方案1】:

    您忘记在counts.pprint 函数中添加()

    counts.pprint改成counts.pprint(),就可以了。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-12-14
      • 2018-07-16
      • 1970-01-01
      • 2016-03-02
      • 2011-07-05
      • 1970-01-01
      • 2018-02-10
      • 2018-12-13
      相关资源
      最近更新 更多