【问题标题】:Why do I get so many empty partitions when repartionning a Spark Dataframe?为什么在重新分区 Spark Dataframe 时会得到这么多空分区?
【发布时间】:2018-11-14 15:08:53
【问题描述】:

我想将数据框“df1”划分为 3 列。对于这 3 列,该数据框正好有 990 个独特的组合:

In [17]: df1.createOrReplaceTempView("df1_view")

In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+                                                                      
|count(1)|
+--------+
|     990|
+--------+

为了优化这个数据帧的处理,我想对 df1 进行分区以获得 990 个分区,每个关键可能性一个:

In [19]: df1.rdd.getNumPartitions()
Out[19]: 24

In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")

In [21]: df2.rdd.getNumPartitions()
Out[21]: 990

我写了一个简单的方法来计算每个分区中的行数:

In [22]: def f(iterator):
    ...:     a = 0
    ...:     for partition in iterator:
    ...:         a = a + 1
    ...:     print(a)
    ...: 

In [23]: df2.foreachPartition(f)

我注意到,实际上我得到的是 628 个具有一个或多个键值的分区,以及 362 个空分区。

我假设 spark 会以均匀的方式重新分区(1 个键值 = 1 个分区),但这看起来不像,我觉得这种重新分区会增加数据倾斜,即使它应该是相反的方式......

Spark 使用什么算法对列上的数据框进行分区? 有没有办法实现我认为可能的?

我在 Cloudera 上使用 Spark 2.2.0。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql partitioning


    【解决方案1】:

    要跨分区分布数据,火花需要以某种方式将列的值转换为分区的索引。 Spark 中有两个默认的分区器——HashPartitioner 和 RangePartitioner。 Spark 中的不同转换可以应用不同的分区器 - 例如join 将应用哈希分区器。

    基本上,哈希分区公式将值转换为分区索引将是value.hashCode() % numOfPartitions。在您的情况下,多个值映射到同一个分区索引。

    如果您想要更好的分布,您可以实现自己的分区器。更多关于它的信息是 hereherehere

    【讨论】:

    • 我成功地使用this resource 实现了一个自定义分区器但是,当数据最初位于数据帧中时,使用 RDD API 似乎会产生巨大的“转换”成本......因为我不能t 使用数据框 API 创建 CustomPartitioner,似乎我被 HashPartitioner 困住了,我最好的做法是减少分区数量以获得“均匀”分布。
    • @tomcat 更改 RDD 中的分区对于 Dataframe 来说应该不是问题。如果我的陈述有误,请纠正我。 here 所有数据帧都只是 rdds 的高级 api。你能在它说有转换成本的地方添加资源吗?
    猜你喜欢
    • 1970-01-01
    • 2019-03-02
    • 1970-01-01
    • 1970-01-01
    • 2019-09-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多