【发布时间】:2021-05-28 20:29:04
【问题描述】:
我是 Kafka 和 Spark 的新手。我已通过 Kafka 生产者传递消息并尝试在火花流中读取,但在 main 方法中出现错误。代码如下。
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 Streaming Example.py
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
import time
# In[ ]:
if __name__ == "__main__":
spark = SparkSession.builder.master("local").appName("Kafka Spark Demo").getOrCreate()
sc=spark.sparkContext
ssc=StreamingContext(sc, 20)
message = KafkaUtils.createDirectStream(ssc, topic=['testtopic'], KafkaParams = {"metadata.broker.list": "localhost:9092"})
data = message.map(lambda x: x[1])
def functordd(rdd):
try:
rdd1=rdd.map(lambda x: json.loads(x))
df = spark.read.json(rdd1)
df.show()
sf.createOrReplaceTimeView("Test")
df1=spark.sql("select iss_position.latitude, iss_position.longitude, message, timestamp from Test")
df1.write.format('csv').mode('append').save("testing")
except:
pass
data.foreachRDD(functordd)
sc.stop()
【问题讨论】:
-
请将您的完整错误显示为文本,而不是图像。你用什么命令来运行代码?
-
注意:
pyspark.streaming.kafka在 Spark 2.4 前后被删除;如果要对 Kafka 数据执行 sql 操作,则应该使用结构化流式处理 -
>spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 Streaming Example.py 我正在尝试使用此命令运行代码.
-
在 py 文件周围加上引号(或删除它们)...您使用的是什么版本的 Spark?
-
我使用的是 spark 2.4.7 和 python 版本 3.7.9
标签: windows apache-spark pyspark apache-kafka