【问题标题】:Spark dataframe groupby mean and median does not completeSpark dataframe groupby均值和中位数未完成
【发布时间】:2018-08-18 14:14:57
【问题描述】:

我正在使用 Spark sql 数据帧执行 groupby 操作,然后计算每个组的数据的平均值和中位数。原始数据量约为 1 TB。

val df_result = df.filter($"DayOfWeek" <= 5).groupBy("id").agg(
        count("Error").as("Count"), 
        avg("Error").as("MeanError"), 
        callUDF("percentile_approx", col("Error"), lit(0.05)).as("5thError"), 
        callUDF("percentile_approx", col("Error"), lit(0.5)).as("MedianError"), 
        callUDF("percentile_approx", col("Error"), lit(0.95)).as("95thError")).
    filter($"Count" > 1000)


df_result.orderBy(asc("MeanError")).limit(5000)
    .write.format("csv").option("header", "true").save("/user/foo.bar/result.csv")

当我运行该查询时,我的工作被卡住并且没有完成。我该如何调试问题?是否存在导致groupby() 卡住的关键不平衡?

【问题讨论】:

  • 什么是callUDF函数?它是一个聚合函数吗?可以看看源代码吗?
  • @RameshMaharjan org.apache.spark.sql.functions.callUDF 是 spark 的内置函数
  • df.rdd.count 完成了吗?如果不是,那么问题已经在 groupBy 之前了。您可以检查 SparkUI 以查看您的代码卡在哪里。我猜记忆可能是个问题。您可以尝试增加spark.sql.shuffle.partitions,这会增加您在 groupBy 期间洗牌的分区数量(默认为 200),但如果您的数据严重倾斜,这将无济于事,即一个 id 有太多行
  • 用户定义函数"percentile_approx"注册了吗?如果不是,那么这个 UDF 可能只在驱动程序上定义,这可能会导致在groupby 期间将大量数据推回驱动程序。
  • @davidrpugh percentile_approx 是 Hive 内置聚合函数

标签: apache-spark apache-spark-sql spark-dataframe


【解决方案1】:

在 cmets 中已经有很多明智的建议,但我的想法值得在这里:

1) df.count 有效吗?如果没有,您的问题出在您发布的代码之前(如 cmets 中所建议)

2) 查看 Spark UI(如 cmets 中所建议的) - 大多数任务是否快速完成,而少数任务需要很长时间/似乎卡住了?如果是这样,偏斜可能是您的问题

3) 您可能会重写您的查询,首先只找到每个“id”的“计数”。接下来过滤您的原始 df 以仅包含通过广播(以避免 df 的洗牌)内部连接(如果没有太多 1000 次以上的 id)出现超过 1000 次的行。然后聚合这个较小的数据框并计算所有统计数据。如果计数聚合有效,则输出还应显示是否存在任何明显的数据倾斜!

4) 有时将计算分解为更小的步骤,然后写入然后立即从磁盘读取,这帮助我过去完成了一些尴尬的工作。如果一开始生成 df 的成本很高,也可以加快调试速度。

5) 绝对值得升级 spark.sql.shuffle.partitions(如 cmets 中所建议的); 2001 是 spark 中的一个神奇数字 (What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?)

6)我也会尝试改变数据量,如果你只使用星期几 = 1(如 cmets 中的建议),它是否有效

7) 查询是否在没有 percentile_approx 的情况下运行?

【讨论】:

  • (1) 是的,原始 df 上的 df.count 工作正常。 (7) 如果我删除percentile_approx 语句,查询不会始终完成。一般来说,我的原始查询有 50% 的时间完成,而另外 50% 的时间,系统管理员会因为我阻止集群而对我大喊大叫。
  • (5) 是否有关于分区和分区最佳实践的“官方”文档?喜欢 Databricks 或 Cloudera 的文档?我已经搜索过,但找不到任何确定的内容。
  • 我在快速搜索后找不到任何东西。似乎有一种模糊的共识,即如果任务花费的时间少于 100 毫秒,那么您的分区就太多了。
  • 如果您的原始查询在 50% 的时间内完成,这表明您的集群内发生了一些变化?您是否每次都使用相同的选项提交?集群上是否启用了动态分配?
  • 我认为作业完成 50% 的时间是由于对不同的输入数据运行查询。也许有时在groupby 期间有太多数据到达一个节点?
猜你喜欢
  • 2021-09-06
  • 1970-01-01
  • 2022-11-12
  • 1970-01-01
  • 2018-01-31
  • 2019-08-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多