【问题标题】:Unable to Convert to DataFrame from RDD after applying partitioning应用分区后无法从 RDD 转换为 DataFrame
【发布时间】:2018-01-24 19:19:57
【问题描述】:

我使用的是 Spark 2.1.0

当我尝试在数据框上使用窗口函数时

val winspec = Window.partitionBy("partition_column")
DF.withColumn("column", avg(DF("col_name")).over(winspec))

我的计划发生了变化,并将以下几行添加到物理计划中,由于这个额外的阶段,正在发生额外的洗牌,并且数据是巨大的,这会像任何事情一样减慢我的查询并运行数小时。

+- Window [avg(cast(someColumn#262 as double)) windowspecdefinition(partition_column#460, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS someColumn#263], [partition_column#460]
   +- *Sort [partition_column#460 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(partition_column#460, 200)

我也将舞台视为 MapInternalPartition,我认为它是内部分区的。现在我不知道这是什么。但是因为我认为因此即使我的 100 个任务花费了 30 多分钟,其中 99 个任务在 1-2 分钟内完成,最后 1 个任务花费了剩下的 30 分钟,使我的集群 IDLE 没有并行处理,这让我认为这是使用Window函数时数据分区正确???

我尝试通过将其转换为 RDD 来应用 HashPartitioning...因为我们无法在 Dataframe 上应用 Custom / HashPartitioner

所以如果我这样做:

val myVal = DF.rdd.partitioner(new HashPartitioner(10000))

我得到一个 ANY 的返回类型,我没有得到任何要执行的操作列表。

我检查并看到在窗口函数中发生分区的列包含所有 NULL 值

【问题讨论】:

  • 您的数据似乎严重倾斜

标签: scala apache-spark apache-spark-sql databricks


【解决方案1】:

TL;DR

  1. 使用窗口函数时的随机播放不是额外。它是正确性所必需的,不能被删除。
  2. 应用分区程序随机播放。
  3. 数据集不能重复使用 RDD 分区。使用Dataset 进行分区应该使用repartition 方法:

    df.repartition($"col_name")
    
  4. 但它对你没有帮助,因为 2)

  5. 还有这个:

    val myVal = DF.rdd.partitioner(new HashPartitioner(10000))
    

    不会返回Any。它不会编译,因为 RDD 没有 partitioner 方法,它接受参数。

    正确的方法是partitionBy,但不适用于RDD[Row],因为3)对你没有帮助。

如果内存够大可以试试

df.join(
  broadcast(df.groupBy("partition_column").agg(avg(DF("col_name"))),
  Seq("partition_column")
)

编辑

如果您尝试计算运行平均值(avgWindow.partitionBy("partition_column") 按组计算全局平均值,而不是运行平均值),那么您就不走运了。

如果分区列只有NULLS,则任务不是分布式且完全顺序的。

要计算全局运行平均值,您可以尝试应用类似于 How to compute cumulative sum using Spark 的逻辑。

【讨论】:

  • 什么是顺序依赖,你能解释一下吗?你在这里看到什么影响..?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-05-29
  • 1970-01-01
  • 2020-01-24
  • 1970-01-01
  • 1970-01-01
  • 2023-03-13
  • 2016-05-29
相关资源
最近更新 更多