【问题标题】:How we save a Huge pyspark dataframe?我们如何保存一个巨大的 pyspark 数据框?
【发布时间】:2019-12-01 00:14:28
【问题描述】:

我有一个很大的 pyspark 数据框,我想将它保存在 myfile (.tsv) 中以供进一步使用。为此,我定义了以下代码:

with open(myfile, "a") as csv_file:
        writer = csv.writer(csv_file, delimiter='\t')
        writer.writerow(["vertex" + "\t" + "id_source" + "\t" + "id_target" + "\t"+ "similarity"])

        for part_id in range(joinDesrdd_df.rdd.getNumPartitions()):
            part_rdd = joinDesrdd_df.rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
            data_from_part_rdd = part_rdd.collect()
            vertex_list = set()

            for row in data_from_part_rdd:
                writer.writerow([....])

        csv_file.flush() 

我的代码无法通过这一步,它会产生异常:

1.

 in the workers log:
19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/2 finished with state KILLED exitStatus 143
14: 19/07/22 08:58:57 INFO ExternalShuffleBlockResolver: Application app-20190722085320-0000 removed, cleanupLocalDirs = true
14: 19/07/22 08:58:57 INFO Worker: Cleaning up local directories for application app-20190722085320-0000
 5: 19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/1 finished with state KILLED exitStatus 143
 7: 19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/14 finished with state KILLED exitStatus 143
...

2- 在作业执行日志中:

Traceback (most recent call last):
  File "/project/6008168/tamouze/RWLastVersion2207/module1.py", line 306, in <module>
    for part_id in range(joinDesrdd_df.rdd.getNumPartitions()):
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 88, in rdd
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o528.javaToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(id_source#263, id_target#292, similarity#258, 1024)
+- *(11) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258])

我不知道为什么我的这段代码会产生异常。注意,小数据上执行还可以,大数据不行。

另外,请问保存 pysaprk 数据框以供进一步使用的最佳方法是什么?

更新: 我试图用以下循环替换上面的内容:

joinDesrdd_df.withColumn("par_id",col('id_source')%50).repartition(50, 'par_id').write.format('parquet').partitionBy("par_id").save("/project/6008168/bib/RWLastVersion2207/randomWalkParquet/candidate.parquet")

也得到类似的异常:

19/07/22 21:10:18 INFO TaskSetManager: Finished task 653.0 in stage 11.0 (TID 2257) in 216940 ms on 172.16.140.237 (executor 14) (1017/1024)
19/07/22 21:11:32 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(par_id#328, 50)
+- *(12) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258, par_id#328])
   +- Exchange hashpartitioning(id_source#263, id_target#292, similarity#258, 1024)
      +- *(11) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258])
         +- *(11) Project [id_source#263, id_target#292, similarity#258]
            +- *(11) BroadcastHashJoin [instance_target#65], [instance#291], Inner, BuildRight

【问题讨论】:

  • 数据帧写入方法你知道吗? (joinDesrdd_df.write.csv)
  • @ernest_k 请查看更新后的帖子

标签: apache-spark pyspark pyspark-sql


【解决方案1】:

我建议使用 Spark 原生写入功能:

joinDesrdd_df.write.format('csv').option("header", "true").save("path/to/the/output/csv/folder")

Spark 会将数据帧的每个分区作为单独的 csv 文件保存到指定的路径中。您可以通过repartition 方法控制文件的数量,这将使您能够控制每个文件将包含多少数据。

我还想建议对大数据集使用 ORC 或 Parquet 数据格式,因为它们绝对更适合存储大数据集。

例如镶木地板:

joinDesrdd_df.withColumn("par_id",col('id_source')%50). \
 repartition(50, 'par_id').write.format('parquet'). \
 save("/project/6008168/bib/RWLastVersion2207/randomWalkParquet/candidate.parquet")

将其读回数据框:

df = spark.read. \
 parquet("/project/6008168/bib/RWLastVersion2207/randomWalkParquet/candidate.parquet")

【讨论】:

  • 我想试试你的建议,但在这种情况下,如何重新读取所有生成的分区。假设我想另存为镶木地板,你的代码行应该如何
  • @bib.. 这篇文章对从 parquet 文件中读取分区数据有一些很好的答案。 stackoverflow.com/questions/33650421/…
  • 使用上面建议的重新分区方法,我仍然进入oom。
  • 您的数据是否在分区列上倾斜?
  • 您能否给我们看看代码,即;在将其写入磁盘之前如何创建该大数据帧
猜你喜欢
  • 2018-05-03
  • 1970-01-01
  • 2021-05-03
  • 2020-06-15
  • 2016-06-27
  • 2019-09-29
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多