【问题标题】:Number of dataframe partitions after sorting?排序后的数据框分区数?
【发布时间】:2019-05-16 02:32:47
【问题描述】:

使用orderBy后spark如何确定分区数?我一直认为生成的数据框有spark.sql.shuffle.partitions,但这似乎不是真的:

val df = (1 to 10000).map(i => ("a",i)).toDF("n","i").repartition(10).cache

df.orderBy($"i").rdd.getNumPartitions // = 200 (=spark.sql.shuffle.partitions)
df.orderBy($"n").rdd.getNumPartitions // = 2 

在这两种情况下,spark都是+- Exchange rangepartitioning(i/n ASC NULLS FIRST, 200),那么第二种情况下得到的分区数怎么会是2呢?

【问题讨论】:

  • 我将两者与explain extended进行了比较,这给了我所有的计划......所有计划都在同一条路径上,但唯一的区别是i字符串和n是数字。在 spark 内部进行 RangePartitioning 时....根据数据类型,它似乎有所不同。
  • 我注意到的一件事是,如果您使用df.orderBy($"n",$"i"),那么分区长度又是 200。所以这完全基于数据类型,发生在 spark api 中。
  • 根据您的示例分别使用 a、b、c、d 联合 4 个 df 时获得 5 个。代码中一定有答案。

标签: apache-spark apache-spark-sql


【解决方案1】:

spark.sql.shuffle.partitions 用作上限。最终的分区数是1 <= partitions <= spark.sql.shuffle.partition


正如您所提到的,Spark 中的排序通过RangePartitioner。它试图实现的是将您的数据集划分为大致相等范围的指定数量(spark.sql.shuffle.partition)。

保证在分区后相同的值将在同一个分区中。值得查看RangePartitioning(不是公共 API 的一部分)类文档:

...

ordering 中的表达式计算为相同值的所有行将位于同一分区中

如果不同排序值的数量小于所需的分区数,即可能的范围数小于spark.sql.shuffle.partition,您最终会得到更少的分区数。另外,这里引用RangePartitionerScaladoc 的一段话:

RangePartitioner 创建的实际分区数可能 与 partitions 参数不同,如果 采样记录数小于分区值。

回到您的示例,n 是一个常量 ("a"),无法进行分区。另一方面,i 可以有 10,000 个可能的值,并被划分为 200 个 (=spark.sql.shuffle.partition) 范围或分区。

请注意,这仅适用于 DataFrame/Dataset API。当使用 RDD 的sortByKey 时,可以明确指定分区数,或者 Spark 将使用当前的分区数。

另见:

【讨论】:

  • 他有 2 个分区用于常量 a。否则为黄色状态的第一个块。
  • @thebluephantom 2 个分区,但所有 都将在第一个分区中结束。您可以通过df.orderBy("n").rdd.mapPartitionsWithIndex((idx, iter) => if (idx == 0) iter else Iterator()).count() 查询
  • 所以,第二个分区是空的。这让人放心,他问的问题是为什么 2 个分区,1 个空呢?我得到了 5 个具有 4 个不同值的值。那时看起来不稳定
【解决方案2】:

除了查看 Range Partitioning for Sorting 之外,我还进行了各种测试,以便更凭经验了解这一点——这是问题的症结所在。见How does range partitioner work in Spark?

已经尝试了问题示例中的“n”的 1 个不同值,以及“n”的超过 1 个这样的不同值,然后使用各种数据帧大小与 df.orderBy($" n"):

  • 很明显,计算 随后通过 mapPartitions 确定将包含用于排序的数据范围的分区数
  • 它基于在为这些计算范围计算一些启发式最佳分区数之前从现有分区中采样,
  • 在大多数情况下会计算并因此生成 N+1 个分区,其中 分区 N+1 为空

分配的额外分区几乎总是空的这一事实让我认为编码中在某种程度上存在计算错误,换句话说就是一个小错误恕我直言。

我基于以下简单测试,该测试确实返回了我认为是正确的分区数的 RR:

val df_a1 = (1 to 1).map(i => ("a",i)).toDF("n","i").cache
val df_a2 = (1 to 1).map(i => ("b",i)).toDF("n","i").cache
val df_a3 = (1 to 1).map(i => ("c",i)).toDF("n","i").cache
val df_b = df_a1.union(df_a2)
val df_c = df_b.union(df_a3)

df_c.orderBy($"n")
 .rdd
 .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
 .toDF("partition_number","number_of_records")
 .show(100,false)

返回:

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|0               |1                |
|1               |1                |
|2               |1                |
+----------------+-----------------+

这个边界示例计算相当简单。只要我将 1 到 21 .. N 用于任何“n”,就会产生额外的空分区:

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|0               |2                |
|1               |1                |
|2               |1                |
|3               |0                |
+----------------+-----------------+

排序要求给定“n”或一组“n”的所有数据位于同一分区中。

【讨论】:

    猜你喜欢
    • 2021-12-08
    • 1970-01-01
    • 1970-01-01
    • 2018-09-10
    • 2020-08-30
    • 2016-01-14
    • 1970-01-01
    • 2019-07-05
    相关资源
    最近更新 更多