【问题标题】:How to transform dataframes to rdds in structured streaming?如何在结构化流中将数据帧转换为 rdds?
【发布时间】:2020-04-10 21:33:39
【问题描述】:

我使用 pyspark 流从 kafka 获取数据,结果是一个数据帧,当我将数据帧转换为 rdd 时,它出错了:

Traceback (most recent call last):
File "/home/docs/dp_model/dp_algo_platform/dp_algo_core/test/test.py", line 36, in <module>
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 91, in rdd
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

正确的版本代码:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df = df.withColumn("s", F.split(df['value'], " "))
df = df.withColumn('e', F.explode(df['s']))
# df = df.rdd.map(lambda x: x.value.split(" ")).toDF()

q = df.writeStream \
    .format("console") \
    .trigger(processingTime='30 seconds') \
    .start()

q.awaitTermination()

这是错误的版本代码:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# df = df.withColumn("s", F.split(df['value'], " "))
# df = df.withColumn('e', F.explode(df['s']))
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()

q = df.writeStream \
    .format("console") \
    .trigger(processingTime='30 seconds') \
    .start()

q.awaitTermination()

为什么它不能将数据帧转换为 rdd?当我想在 pyspark 流中将数据帧转换为 rdd 时该怎么做?

【问题讨论】:

    标签: apache-spark spark-streaming


    【解决方案1】:

    如果您的 spark 版本是 2.4.0 及更高版本,那么您可以使用以下替代方法来处理数据帧的每一行。

    query=df.writeStream.foreach(Customized method to work on each row of dataframe rather than RDD).outputMode("update").start()
        ssc.start()
        ssc.awaitTermination()
    

    【讨论】:

    • 请在您的答案中添加一些解释,以便其他人可以从中学习
    • 例如:foreach 下面的类将解析结构化流数据帧中的每一行并将其传递给类 SendToKudu_ForeachWriter,该类将具有将其转换为 rdd 的逻辑。传递的行将采用数据帧的形式,因此可以使用 df.rdd 将其转换为 rdd。 class SendToKudu_ForeachWriter(): def process(self, row): #Your logic to work on row passing as dataframe here" if name == 'main': query=streaming_parsed_dataframe .writeStream.foreach(SendToKudu_ForeachWriter()).outputMode("update").start() query.awaitTermination()
    • 请通过编辑为您的答案添加所有说明
    【解决方案2】:

    这个 RDD 方面根本不被支持。 RDD 是遗留的,Spark Structured Streaming 是基于 DF/DS 的。流式或批处理的通用抽象。

    【讨论】:

    • 如果不支持,如何使用pyspark流将Kafka数据读取为rdds,我的spark版本是2.4.3,kafka版本是2.1.0?
    • 结构化流式传输不同。
    • @littlely Spark Streaming != 结构化流媒体
    • 是否可以将 spark Dataframe 转换为流数据帧?
    • @Innovator-programmer 不,不是
    【解决方案3】:

    要对 Dataframe 字段执行特定操作,您可以使用 UDF 函数,甚至可以创建 Spark 自定义转换器。但是有一些 Dataframe 操作不支持,例如转换为 RDD。

    【讨论】:

      【解决方案4】:

      结构化流在 spark-sql 引擎上运行。不支持将数据帧或数据集转换为 RDD。

      【讨论】:

        猜你喜欢
        • 2021-10-23
        • 1970-01-01
        • 2018-12-31
        • 1970-01-01
        • 2019-03-30
        • 1970-01-01
        • 1970-01-01
        • 2018-03-28
        • 2018-03-21
        相关资源
        最近更新 更多