【问题标题】:How to export a table dataframe in PySpark to csv?如何将 PySpark 中的表数据框导出到 csv?
【发布时间】:2015-10-01 20:02:39
【问题描述】:

我使用的是 Spark 1.3.1 (PySpark),并且我使用 SQL 查询生成了一个表。我现在有一个对象是 DataFrame。我想将这个 DataFrame 对象(我称之为“表”)导出到 csv 文件,以便我可以操作它并绘制列。如何将DataFrame“表”导出到 csv 文件?

谢谢!

【问题讨论】:

    标签: python apache-spark dataframe apache-spark-sql export-to-csv


    【解决方案1】:

    如果数据帧适合驱动程序内存并且您想保存到本地文件系统,您可以使用toPandas 方法将Spark DataFrame 转换为本地Pandas DataFrame,然后只需使用to_csv

    df.toPandas().to_csv('mycsv.csv')
    

    否则你可以使用spark-csv:

    • 火花 1.3

      df.save('mycsv.csv', 'com.databricks.spark.csv')
      
    • 火花 1.4+

      df.write.format('com.databricks.spark.csv').save('mycsv.csv')
      

    在 Spark 2.0+ 中可以直接使用csv 数据源:

    df.write.csv('mycsv.csv')
    

    【讨论】:

    • 如果你有 spark 数据帧,你可以使用df.write.csv('/tmp/lookatme/'),这将在/tmp/lookatme 中删除一组 csv 文件使用 spark 比在 pandas 中序列化它要快得多。唯一的缺点是您最终会得到一组 csv,而不是一个,如果目标工具不知道如何连接它们,您将需要自己做。
    • 从 spark 中获取 csv 是一件多么大的事情。第一个解决方案的有趣之处在于 to_csv 无需导入 Pandas 即可工作。 .toPandas 是 Spark 的一部分,也许它隐式导入它..
    • 如果您坚持使用单个输出文件,您应该可以使用df.coalesce(1).write.csv('mycsv.csv')
    • @Txangel 感谢您的回答。但是,当我使用它时,它运行时没有任何错误,但我找不到在目标位置创建的 any csv。有什么想法吗?
    • 使用 df.write.csv('mycsv.csv') 将 csv 导出到 hdfs 环境。如何在我的本地环境中获取它?
    【解决方案2】:

    对于 Apache Spark 2+,为了将数据帧保存到单个 csv 文件中。使用以下命令

    query.repartition(1).write.csv("cc_out.csv", sep='|')
    

    这里1表示我只需要一个csv分区。您可以根据自己的要求进行更改。

    【讨论】:

    • 如这里所示:spark.apache.org/docs/2.2.0/api/python/… 建议使用 coalesce() 而不是 repartition() 以提高性能(“如果您要减少此 RDD 中的分区数,请考虑使用 coalesce,它可以避免执行随机播放。”)
    • @Seastar:虽然合并可能在几个用例中具有优势,但您的评论不适用于这种特殊情况。如果你想在你的 hdfs(或其他)中有一个 .csv,你通常需要一个文件而不是几十个文件分布在你的集群中(做repartition(1) 的整体感觉。你需要为此打乱数据方式,所以合并在更大的范围内根本没有帮助。
    【解决方案3】:

    如果您不能使用 spark-csv,您可以执行以下操作:

    df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")
    

    如果您需要处理带有换行符或逗号的字符串,这将不起作用。使用这个:

    import csv
    import cStringIO
    
    def row2csv(row):
        buffer = cStringIO.StringIO()
        writer = csv.writer(buffer)
        writer.writerow([str(s).encode("utf-8") for s in row])
        buffer.seek(0)
        return buffer.read().strip()
    
    df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")
    

    【讨论】:

      【解决方案4】:

      您需要将Dataframe重新分区到单个分区中,然后以Unix文件系统格式定义文件的格式,路径和其他参数,然后就可以了,

      df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')
      

      阅读更多关于repartition function 阅读更多关于save function

      但是,重新分区是一个代价高昂的函数,而 toPandas() 是最糟糕的。尝试在以前的语法中使用 .coalesce(1) 而不是 .repartition(1) 以获得更好的性能。

      阅读更多repartition vs coalesce functions

      【讨论】:

        【解决方案5】:

        这个怎么样(如果你不想要一个班轮)?

        for row in df.collect():
            d = row.asDict()
            s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"])
            f.write(s)
        

        f 是一个打开的文件描述符。分隔符也是 TAB 字符,但很容易更改为您想要的任何内容。

        【讨论】:

          【解决方案6】:
          '''
          I am late to the pary but: this will let me rename the file, move it to a desired directory and delete the unwanted additional directory spark made
          '''
          
          import shutil
          import os
          import glob
          
          path = 'test_write'
          #write single csv
          students.repartition(1).write.csv(path)
          
          #rename and relocate the csv
          shutil.move(glob.glob(os.getcwd() + '\\' + path + '\\' + r'*.csv')[0], os.getcwd()+ '\\' + path+ '.csv')
          
          #remove additional directory
          shutil.rmtree(os.getcwd()+'\\'+path)
          

          【讨论】:

            【解决方案7】:

            使用 PySpark

            在 Spark 3.0+ 中用 csv 写入的最简单方法

            sdf.write.csv("/path/to/csv/data.csv")
            

            这可以根据您使用的 spark 节点的数量生成多个文件。如果您想在单个文件中获取它,请使用重新分区。

            sdf.repartition(1).write.csv("/path/to/csv/data.csv")
            

            使用熊猫

            如果你的数据不多,可以保存在本地python中,那么你也可以使用pandas

            sdf.toPandas().to_csv("/path/to/csv/data.csv", index=False)
            

            使用考拉

            sdf.to_koalas().to_csv("/path/to/csv/data.csv", index=False)
            

            【讨论】:

              【解决方案8】:

              尝试 display(df) 并在结果中使用下载选项。请注意:使用此选项只能下载 100 万行,但速度非常快。

              【讨论】:

                猜你喜欢
                • 1970-01-01
                • 2018-10-04
                • 2020-06-15
                • 1970-01-01
                • 1970-01-01
                • 2020-02-01
                • 2018-06-27
                • 2021-08-16
                • 2020-10-26
                相关资源
                最近更新 更多