【问题标题】:Difference in Spark SQL Shuffle partitionsSpark SQL Shuffle 分区的区别
【发布时间】:2020-10-11 19:45:18
【问题描述】:

我正在尝试了解默认设置为 200 的 Spark Sql Shuffle Partitions。 数据如下所示,后面是为这两种情况创建的分区数。

scala> flightData2015.show(3)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+

scala> println(flightData2015.sort("DEST_COUNTRY_NAME").rdd.getNumPartitions)
104

scala> println(flightData2015.groupBy("DEST_COUNTRY_NAME").count().rdd.getNumPartitions)
200

这两种情况都会导致一个 Shuffle 阶段,这应该会产生 200 个分区(默认值)。有人能解释一下为什么会有区别吗?

【问题讨论】:

    标签: scala apache-spark pyspark apache-spark-sql apache-spark-dataset


    【解决方案1】:

    根据 Spark 文档,有两种重新分区数据的方法。一种是通过此配置 spark.sql.shuffle.partitions 作为默认值 200,并且始终在您运行任何连接或聚合时应用,如您所见 here

    当我们谈论sort() 时,这并不是那么简单,Spark 使用计划器来确定数据集中数据的倾斜程度。如果它不是太偏斜,而不是使用sort-merge join,这将导致 200 个分区,如您预期的那样,它更愿意在分区之间执行broadcast 的数据以避免完全洗牌。这可以在排序期间节省时间以减少网络流量更多详情here

    【讨论】:

      【解决方案2】:

      这两种情况的区别在于sortgroupBy 在底层使用了不同的分区器。

      1. groupBy - 使用hashPartitioning,这意味着它计算密钥的哈希值,然后计算pmod 200(或设置为随机分区数的任何值),因此它将始终创建 200 个分区(即使有些其中可能为空)
      2. sort/orderBy - 正在使用 rangePartitioning,这意味着它运行一个单独的作业来对数据进行采样,并在此基础上为分区创建边界(试图使它们成为 200)。现在根据采样数据分布和实际行数,它可能会创建小于 200 的边界,这就是您只得到 104 的原因。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-06-19
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-03-06
        • 1970-01-01
        • 2019-08-26
        相关资源
        最近更新 更多