【问题标题】:Writing DataFrame to Parquet or Delta Does not Seem to be Parallelized - Taking Too Long将 DataFrame 写入 Parquet 或 Delta 似乎没有被并行化 - 耗时太长
【发布时间】:2020-05-14 01:33:51
【问题描述】:

问题陈述

我已将已分区的 CSV 文件读入 Spark Dataframe。

为了利用增量表的改进,我尝试将其简单地导出为 Azure Data Lake Storage Gen2 内的目录中的增量。我在 Databricks 笔记本中使用以下代码:

%scala

df_nyc_taxi.write.partitionBy("year", "month").format("delta").save("/mnt/delta/")

整个数据帧大约有 160 GB。

硬件规格

我正在使用具有 12 个内核和 42 GB RAM 的集群运行此代码。

但是看起来整个写入过程是由 Spark/Databricks 按顺序处理的,例如非平行时尚

DAG 可视化如下所示:

总而言之,这似乎需要 1-2 小时才能执行。

问题

  • 有没有办法让 Spark 真正并行写入不同的分区?
  • 会不会是我尝试将增量表直接写入 Azure Data Lake Storage 的问题?

【问题讨论】:

  • 试试repartition(your_partition_columns).write.partitionBy("year", "month")
  • 感谢@eliasah 的输入。重新分区不需要一个整数而不是一个列列表吗?
  • 当我尝试时:df_nyc_taxi.repartition("year", "month").write.partitionBy("year", "month").format("delta").save("/mnt /delta/") 我得到:错误:使用替代方法重载方法值重新分区:(partitionExprs: org.apache.spark.sql.Column*)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row ] (numPartitions: Int,partitionExprs: org.apache.spark.sql.Column*)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] 不能应用于 (String, String ) df_nyc_taxi.repartition("year", "month").write.partitionBy("year", "month").format("delta").save("/mnt/delta/")
  • repartition 可以将列列表作为重复参数。检查这个:stackoverflow.com/questions/52521067/…
  • 很好,会检查的。谢谢

标签: scala apache-spark databricks azure-data-lake azure-databricks


【解决方案1】:

要跟进@eliasah 的评论,也许你可以试试这个:

import org.apache.spark.sql.functions
df_nyc_taxi.repartition(col("year"), col("month"), lit(rand() * 200)).write.partitionBy("year", "month").format("delta").save("/mnt/delta/")

@eliasah 的答案很可能会为每个目录“/mnt/delta/year=XX/month=XX”创建一个文件,并且只有一个工作人员将数据写入每个文件。额外的列将进一步对数据进行切片(在这种情况下,我将每个原始文件中的数据划分为 200 个较小的分区,您可以根据需要对其进行编辑),以便更多的工作人员可以并发写入。

P.S:对不起,我还没有足够的代表发表评论:'D

【讨论】:

    【解决方案2】:

    这与其他答案类似,但是,我在重新分区后和编写它之前添加了一个持久化。持久化将进入内存并休息(内存已满后剩余)将溢出到磁盘,这仍然比再次读取要快。过去对我来说效果很好。我选择了 1250 个分区,因为 128mb 是我常用的分区大小。 Spark 之所以成为现在这样是因为内存中的计算,因此最好在有机会时应用它。

    from pyspark.sql import functions as F
    df_nyc_taxi.repartition(1250,F.col("year"), col("month"))\
    .persist(StorageLevel.MEMORY_AND_DISK).write.partitionBy("year", "month")\
    .format("delta").save("/mnt/delta/")
    

    【讨论】:

      猜你喜欢
      • 2020-04-10
      • 2018-11-27
      • 2022-01-16
      • 1970-01-01
      • 2021-05-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-19
      相关资源
      最近更新 更多