【发布时间】:2018-01-24 19:19:57
【问题描述】:
我使用的是 Spark 2.1.0
当我尝试在数据框上使用窗口函数时
val winspec = Window.partitionBy("partition_column")
DF.withColumn("column", avg(DF("col_name")).over(winspec))
我的计划发生了变化,并将以下几行添加到物理计划中,由于这个额外的阶段,正在发生额外的洗牌,并且数据是巨大的,这会像任何事情一样减慢我的查询并运行数小时。
+- Window [avg(cast(someColumn#262 as double)) windowspecdefinition(partition_column#460, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS someColumn#263], [partition_column#460]
+- *Sort [partition_column#460 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(partition_column#460, 200)
我也将舞台视为 MapInternalPartition,我认为它是内部分区的。现在我不知道这是什么。但是因为我认为因此即使我的 100 个任务花费了 30 多分钟,其中 99 个任务在 1-2 分钟内完成,最后 1 个任务花费了剩下的 30 分钟,使我的集群 IDLE 没有并行处理,这让我认为这是使用Window函数时数据分区正确???
我尝试通过将其转换为 RDD 来应用 HashPartitioning...因为我们无法在 Dataframe 上应用 Custom / HashPartitioner
所以如果我这样做:
val myVal = DF.rdd.partitioner(new HashPartitioner(10000))
我得到一个 ANY 的返回类型,我没有得到任何要执行的操作列表。
我检查并看到在窗口函数中发生分区的列包含所有 NULL 值
【问题讨论】:
-
您的数据似乎严重倾斜
标签: scala apache-spark apache-spark-sql databricks