【问题标题】:How to create a DataFrame out of rows while retaining existing schema?如何在保留现有架构的同时从行中创建 DataFrame?
【发布时间】:2015-12-23 15:26:18
【问题描述】:

如果我调用 map 或 mapPartition 并且我的函数从 PySpark 接收行,那么创建本地 PySpark 或 Pandas DataFrame 的自然方法是什么?结合行并保留模式的东西?

目前我正在做类似的事情:

def combine(partition):
    rows = [x for x in partition]
    dfpart = pd.DataFrame(rows,columns=rows[0].keys())
    pandafunc(dfpart)

mydf.mapPartition(combine)

【问题讨论】:

    标签: python pandas apache-spark pyspark pyspark-sql


    【解决方案1】:

    Spark >= 2.3.0

    从 Spark 2.3.0 开始,可以按分区或组使用 Pandas SeriesDataFrame。例如:

    火花

    创建本地 PySpark 的自然方式是什么

    没有这样的事情。 Spark 分布式数据结构不能嵌套,或者您更喜欢不能嵌套动作或转换的另一个视角。

    或 Pandas 数据帧

    这相对容易,但你至少要记住几件事:

    • Pandas 和 Spark DataFrames 甚至不完全等同。它们是不同的结构,具有不同的属性,通常不能用另一种替代。
    • 分区可以为空。
    • 看起来您正在传递字典。请记住,基本 Python 字典是无序的(例如,与 collections.OrderedDict 不同)。因此,传递列可能无法按预期工作。
    import pandas as pd
    
    rdd = sc.parallelize([
        {"x": 1, "y": -1}, 
        {"x": -3, "y": 0},
        {"x": -0, "y": 4}
    ])
    
    def combine(iter):
        rows = list(iter)
        return [pd.DataFrame(rows)] if rows else []
    
    rdd.mapPartitions(combine).first()
    ##    x  y
    ## 0  1 -1
    

    【讨论】:

    • 谢谢你的解释帮助。该方法类似于我现在使用的方法,但是除了列名之外,是否有一种自然的方式来传递行模式?
    • 我不确定我是否理解这个问题。 Pandas DataFrames 使用可以在闭包中传递的列和 dtype 参数,但 Spark 无法识别这些参数。如果你想要一个 Spark DataFrame,你应该传递这个 createDataFrame 并在那里传递架构(它不同于 Pandas dtypes)。
    • 我想维护模式,就像我想象的正常 toPandas 一样。如果我知道如何调用 createDataFrame 并维护行模式,那么调用 createDataFrame 然后调用 toPandas 就可以了。虽然我猜可能会降低效率?
    • toPandas 简单地收集并创建与 Spark 数据帧具有相同 columns 名称的本地数据结构。不多也不少。 Row(如pyspark.sql.Row)没有架构——它只是一个tuple,添加了一些方法和__fields__存储名称的属性。
    • 有趣,toPandas 也不强制列类型?并且类型没有内置到行中?
    【解决方案2】:

    你可以使用toPandas()

    pandasdf = mydf.toPandas()
    

    【讨论】:

    • 这不能回答我的问题,我需要它在分区的 map 调用中运行。如果有一张地图可以传递数据框,那也很好。
    • 对不起,我听不懂map that passes dataframe。火花数据帧的输出预期是什么?您想为每个分区创建数据框吗?
    • mapPartition 为每个分区传递一个 Row 迭代器,所以我不能使用数据框函数
    【解决方案3】:

    实际上可以在执行器中将 Spark 行转换为 Pandas,并最终使用 mapPartitions 从这些输出中创建 Spark DataFrame。 See my gist in Github

    # Convert function to use in mapPartitions
    def rdd_to_pandas(rdd_):
        # convert rows to dict
        rows = (row_.asDict() for row_ in rdd_)
        # create pandas dataframe
        pdf = pd.DataFrame(rows)
    
        # Rows/Pandas DF can be empty depending on patiition logic.
        # Make sure to check it here, otherwise it will throw untrackable error
        if len(pdf) > 0:
            #
            # Do something with pandas DataFrame 
            #
            pass
    
        return pdf.to_dict(orient='records')
    
    # Create Spark DataFrame from resulting RDD
    rdf = spark.createDataFrame(df.rdd.mapPartitions(rdd_to_pandas))
    

    【讨论】:

      【解决方案4】:

      为了创建 Spark SQL 数据框,您需要一个配置单元上下文:

      hc = HiveContext(sparkContext)
      

      使用 HiveContext,您可以通过 inferSchema 函数创建 SQL 数据框:

      sparkSQLdataframe = hc.inferSchema(rows)  
      

      【讨论】:

      • 好点。这仅适用于 RDD。因此,您可以在调用 mapPartition 之前为变量“combine”调用它。
      • 另外,如果您立即将数据作为数据框读取会更好。您可以对 JSON、Hive 表等多个输入源执行此操作
      • 我有一个数据框,但是当我调用 mapPartition 时,每个从节点都会看到一个行迭代器,为方便起见,我想合并这些行
      • 啊,现在我明白了。不幸的是,我想不出一个好的解决方案。如果您将行组合成一个数据框,您希望应用哪个数据框操作?
      • 最终到Pandas,然后是pandas函数。上面概述的方法可行,但感觉很hacky
      猜你喜欢
      • 1970-01-01
      • 2015-10-07
      • 2019-12-18
      • 2016-09-25
      • 2018-07-22
      • 2020-09-12
      • 1970-01-01
      相关资源
      最近更新 更多