【问题标题】:Why does Apache Spark partitions CSV read based on the file size and how do I change the partitions?为什么 Apache Spark 会根据文件大小读取 CSV 分区以及如何更改分区?
【发布时间】:2020-12-10 12:49:21
【问题描述】:

这是我的 pyspark 代码:

csv_file = "/FileStore/tables/mnt/training/departuredelays02.csv"
schema   = "`date` STRING, `delay` INT, `distance` INT, `origin` STRING, `destination` STRING"
df = (spark
     .read
     .format("csv")                    
     .option("header","true")
     .schema(schema)
     .load(csv_file)                  
        )
partitions = df.rdd.getNumPartitions()
print(partitions)

csv 文件有 487178 行。

打印分区后,我得到的结果是 3 个分区。

请注意,我有 2 个 4 个核心的工人。这意味着总共有 8 个插槽。

现在,如果我尝试加载以下文件,该文件更大,有 1391578 行:

csv_file = "/FileStore/tables/mnt/training/departuredelays.csv"

我得到 8 个分区。

我的问题是如何强制第一个 CSV 以与较大文件相同的方式进行分区。 我知道可以使用重新分区,但我很想知道这是否可以在没有任何洗牌的情况下完成?即使我们重新分区,它似乎也创建了一个包含 3 个任务而不是 8 个任务的作业。

这是我运行以下代码 sn-p 后得到的结果:

df = df.repartition(8)
print(df.count())

第一个任务的第一阶段仍然分配了3个任务。

输出:

(3) Spark Jobs
Job 93 View(Stages: 1/1)
Stage 123: 3/3
Job 94 View(Stages: 1/1, 1 skipped)
Stage 124: 0/3 skipped
Stage 125: 8/8
Job 95 View(Stages: 1/1, 2 skipped)
Stage 126: 0/3 skipped
Stage 127: 0/8 skipped
Stage 128: 1/1

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    您可以尝试使用coalesce,这会比重新分区更合理。

    df = spark
         .read
         .format("csv")                    
         .option("header","true")
         .schema(schema)
         .load(csv_file)                  
         .coalesce(8)
    

    查看这里了解更多信息 Spark - repartition() vs coalesce()

    【讨论】:

    • 感谢您的回复 Mani!...我想我的问题不太清楚...虽然 Coalesce 将有助于避免洗牌,但我的主要目标是了解 Spark 创建的原因小文件的 CSV 读入 3 个任务,大文件的 8 个任务?...还想知道是否可以强制使用 8 个任务读取小文件?...如果我有相同的小 csv 文件拆分为 8 个文件,Spark 自动使用 8 个任务。它对 1391578 行单个 CSV 文件也是如此。 Stackoverflow 中的很多答案确实为 RDD.textfile 回答了这个问题,但对 Dataframe CSV 加载没有回答。
    猜你喜欢
    • 1970-01-01
    • 2020-09-02
    • 1970-01-01
    • 2018-10-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-03
    • 2019-01-04
    相关资源
    最近更新 更多