【问题标题】:Why is groupBy() a lot faster than distinct() in pyspark?为什么在 pyspark 中 groupBy() 比 distinct() 快很多?
【发布时间】:2018-09-11 10:49:14
【问题描述】:

当我将 spark 数据帧上的 distinct() 替换为 groupBy() 时,我的 pyspark 代码的性能得到了很大提升。但我无法理解其背后的原因。 整个意图是从数据框中删除行级重复项。

我尝试在 pyspark 中搜索 groupBy()distinct() 的实现,但找不到。

有人可以解释或指出正确的解释方向吗?

【问题讨论】:

    标签: pyspark


    【解决方案1】:

    我最近关注了 Apache Spark SQL 中 GROUP BYDISTINCT 操作之间的区别。碰巧...有时两者可能相同!

    要查看此内容,请运行以下代码并检查执行计划:

    (0 to 10).map(id => (s"id#${id}", s"login${id % 25}"))
       .toDF("id", "login").createTempView("users")
    
    sparkSession.sql("SELECT login FROM users GROUP BY login").explain(true)
    sparkSession.sql("SELECT DISTINCT(login) FROM users").explain(true)
    

    惊喜,惊喜!计划应如下所示:

    == Physical Plan ==
    *(2) HashAggregate(keys=[login#8], functions=[], output=[login#8])
    +- Exchange hashpartitioning(login#8, 200), ENSURE_REQUIREMENTS, [id=#33]
       +- *(1) HashAggregate(keys=[login#8], functions=[], output=[login#8])
          +- *(1) LocalTableScan [login#8]
    

    为什么?由于您应该在日志中看到ReplaceDistinctWithAggregate 规则:

    === Applying Rule org.apache.spark.sql.catalyst.optimizer.ReplaceDistinctWithAggregate ===
    !Distinct                     Aggregate [login#8], [login#8]
     +- LocalRelation [login#8]   +- LocalRelation [login#8]
               (org.apache.spark.sql.catalyst.rules.PlanChangeLogger:65)
    

    ============================ 更新:

    对于更复杂的查询(例如使用聚合),可能会有所不同。

    sparkSession.sql("SELECT COUNT(login) FROM users GROUP BY login").explain(true)
    sparkSession.sql("SELECT COUNT(DISTINCT(login)) FROM users").explain(true)
    

    GROUP BY 版本生成一个只有一次 shuffle 的计划:

    == Physical Plan ==
    *(2) HashAggregate(keys=[login#8], functions=[count(login#8)], output=[count(login)#12L])
    +- Exchange hashpartitioning(login#8, 200), ENSURE_REQUIREMENTS, [id=#16]
       +- *(1) HashAggregate(keys=[login#8], functions=[partial_count(login#8)], output=[login#8, count#15L])
          +- *(1) LocalTableScan [login#8]
    

    而带有 DISTINCT 的版本会生成 2 次随机播放。第一个用于对登录进行重复数据删除,第二个用于对登录进行计数:

    == Physical Plan ==
    *(3) HashAggregate(keys=[], functions=[count(distinct login#8)], output=[count(DISTINCT login)#17L])
    +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#48]
       +- *(2) HashAggregate(keys=[], functions=[partial_count(distinct login#8)], output=[count#21L])
          +- *(2) HashAggregate(keys=[login#8], functions=[], output=[login#8])
             +- Exchange hashpartitioning(login#8, 200), ENSURE_REQUIREMENTS, [id=#43]
                +- *(1) HashAggregate(keys=[login#8], functions=[], output=[login#8])
                   +- *(1) LocalTableScan [login#8]
    

    但是,从语义上讲,这些查询并不相同,因为第一个查询生成登录组,而第二个查询也计算它们。它解释了额外的随机播放步骤。

    在更改之前/之后使用代码回答问题可能会更容易。 @pri,你有它以便我们分析 PySpark 执行的计划吗?

    【讨论】:

    • 那么如何解释观察结果呢?
    • 分析 Spark UI,也许除了通过改变来区分不同与组之外,还有其他的东西吗?也许它适用于特定的数据集或查询?也许这只是来自 Hive (stackoverflow.com/a/31876286/9726075) 的“意见”?不知道。如果我发现新的东西,我会分享。
    【解决方案2】:

    distinct() 实现检查每一列,如果两行或多行完全相同,则保留第一行。 我想这就是为什么 distinct 这么慢的主要原因。

    Check this topic too.

    【讨论】:

    • 感谢您的回答,但我没有找到任何使 groupBy 比 distinct() 更快的原因。
    • 如果您有 100 多列,则 spark 作业应比较所有列。 groupBy 适用于一个(通常)列, distinct() 检查适用于所有列。试想一下,您已经检查了所有列,任务是多么困难。
    • 如果我想删除行级重复项,那么我也将包括 groupBy 中的所有列,所以这没关系。
    • 如果你使用 groupby() 执行器将进行分组,在将组发送到只按组执行总和、计数等的主服务器之后,但是 distinct() 检查 executors() 中的每一列和在执行程序将不同的数据帧发送到主服务器后尝试删除重复项,然后主服务器再次检查所有列的不同值。我认为这是主要原因。
    猜你喜欢
    • 1970-01-01
    • 2011-05-01
    • 1970-01-01
    • 2018-03-22
    • 1970-01-01
    • 2021-01-08
    • 2015-03-07
    • 2016-11-12
    • 2014-05-28
    相关资源
    最近更新 更多