【发布时间】:2020-12-10 12:49:21
【问题描述】:
这是我的 pyspark 代码:
csv_file = "/FileStore/tables/mnt/training/departuredelays02.csv"
schema = "`date` STRING, `delay` INT, `distance` INT, `origin` STRING, `destination` STRING"
df = (spark
.read
.format("csv")
.option("header","true")
.schema(schema)
.load(csv_file)
)
partitions = df.rdd.getNumPartitions()
print(partitions)
csv 文件有 487178 行。
打印分区后,我得到的结果是 3 个分区。
请注意,我有 2 个 4 个核心的工人。这意味着总共有 8 个插槽。
现在,如果我尝试加载以下文件,该文件更大,有 1391578 行:
csv_file = "/FileStore/tables/mnt/training/departuredelays.csv"
我得到 8 个分区。
我的问题是如何强制第一个 CSV 以与较大文件相同的方式进行分区。 我知道可以使用重新分区,但我很想知道这是否可以在没有任何洗牌的情况下完成?即使我们重新分区,它似乎也创建了一个包含 3 个任务而不是 8 个任务的作业。
这是我运行以下代码 sn-p 后得到的结果:
df = df.repartition(8)
print(df.count())
第一个任务的第一阶段仍然分配了3个任务。
输出:
(3) Spark Jobs
Job 93 View(Stages: 1/1)
Stage 123: 3/3
Job 94 View(Stages: 1/1, 1 skipped)
Stage 124: 0/3 skipped
Stage 125: 8/8
Job 95 View(Stages: 1/1, 2 skipped)
Stage 126: 0/3 skipped
Stage 127: 0/8 skipped
Stage 128: 1/1
【问题讨论】:
标签: apache-spark pyspark