【发布时间】:2018-09-11 10:49:14
【问题描述】:
当我将 spark 数据帧上的 distinct() 替换为 groupBy() 时,我的 pyspark 代码的性能得到了很大提升。但我无法理解其背后的原因。
整个意图是从数据框中删除行级重复项。
我尝试在 pyspark 中搜索 groupBy() 和 distinct() 的实现,但找不到。
有人可以解释或指出正确的解释方向吗?
【问题讨论】:
标签: pyspark
当我将 spark 数据帧上的 distinct() 替换为 groupBy() 时,我的 pyspark 代码的性能得到了很大提升。但我无法理解其背后的原因。
整个意图是从数据框中删除行级重复项。
我尝试在 pyspark 中搜索 groupBy() 和 distinct() 的实现,但找不到。
有人可以解释或指出正确的解释方向吗?
【问题讨论】:
标签: pyspark
我最近关注了 Apache Spark SQL 中 GROUP BY 和 DISTINCT 操作之间的区别。碰巧...有时两者可能相同!
要查看此内容,请运行以下代码并检查执行计划:
(0 to 10).map(id => (s"id#${id}", s"login${id % 25}"))
.toDF("id", "login").createTempView("users")
sparkSession.sql("SELECT login FROM users GROUP BY login").explain(true)
sparkSession.sql("SELECT DISTINCT(login) FROM users").explain(true)
惊喜,惊喜!计划应如下所示:
== Physical Plan ==
*(2) HashAggregate(keys=[login#8], functions=[], output=[login#8])
+- Exchange hashpartitioning(login#8, 200), ENSURE_REQUIREMENTS, [id=#33]
+- *(1) HashAggregate(keys=[login#8], functions=[], output=[login#8])
+- *(1) LocalTableScan [login#8]
为什么?由于您应该在日志中看到ReplaceDistinctWithAggregate 规则:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ReplaceDistinctWithAggregate ===
!Distinct Aggregate [login#8], [login#8]
+- LocalRelation [login#8] +- LocalRelation [login#8]
(org.apache.spark.sql.catalyst.rules.PlanChangeLogger:65)
============================ 更新:
对于更复杂的查询(例如使用聚合),可能会有所不同。
sparkSession.sql("SELECT COUNT(login) FROM users GROUP BY login").explain(true)
sparkSession.sql("SELECT COUNT(DISTINCT(login)) FROM users").explain(true)
GROUP BY 版本生成一个只有一次 shuffle 的计划:
== Physical Plan ==
*(2) HashAggregate(keys=[login#8], functions=[count(login#8)], output=[count(login)#12L])
+- Exchange hashpartitioning(login#8, 200), ENSURE_REQUIREMENTS, [id=#16]
+- *(1) HashAggregate(keys=[login#8], functions=[partial_count(login#8)], output=[login#8, count#15L])
+- *(1) LocalTableScan [login#8]
而带有 DISTINCT 的版本会生成 2 次随机播放。第一个用于对登录进行重复数据删除,第二个用于对登录进行计数:
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(distinct login#8)], output=[count(DISTINCT login)#17L])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#48]
+- *(2) HashAggregate(keys=[], functions=[partial_count(distinct login#8)], output=[count#21L])
+- *(2) HashAggregate(keys=[login#8], functions=[], output=[login#8])
+- Exchange hashpartitioning(login#8, 200), ENSURE_REQUIREMENTS, [id=#43]
+- *(1) HashAggregate(keys=[login#8], functions=[], output=[login#8])
+- *(1) LocalTableScan [login#8]
但是,从语义上讲,这些查询并不相同,因为第一个查询生成登录组,而第二个查询也计算它们。它解释了额外的随机播放步骤。
在更改之前/之后使用代码回答问题可能会更容易。 @pri,你有它以便我们分析 PySpark 执行的计划吗?
【讨论】:
distinct() 实现检查每一列,如果两行或多行完全相同,则保留第一行。 我想这就是为什么 distinct 这么慢的主要原因。
【讨论】: