【问题标题】:Spark SQL window function causes skew in data distributionSpark SQL 窗口函数导致数据分布偏斜
【发布时间】:2017-10-28 16:58:12
【问题描述】:

由于数据分布不均,此 Spark SQL 查询的性能很差:

select c.*, coalesce(
      sum(revenue)
        OVER (PARTITION BY cid, pid, code
        ORDER BY (cTime div (1000*3600))
        RANGE BETWEEN 336 PRECEDING and 1 PRECEDING), 0L) as totalRevenue
  from records c

如果我增加扫描范围,我在 SparkUI 中看到单个任务堆栈和集群失败。

我在 AWS EMR 中使用 Yarn 和 Spark 2.2.0

我该如何克服这个问题? 谢谢

【问题讨论】:

  • 您能否添加有关失败的信息?大概是 OutOfMemeoryError 吧?
  • ExecutorLostFailure(执行器 8 因正在运行的任务之一而退出) 原因:容器因超出内存限制而被 YARN 杀死。使用了 21.8 GB 的 21.7 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead
  • 你能计算每个cidpidcode 的行数吗?可以分享一下执行计划吗? Spark 的版本是多少?这是纱线吗?在您的问题中添加整个堆栈跟踪。添加问题的所有答案(编辑并添加我要求的内容)。谢谢!
  • 谢谢,@JacekLaskowski,我添加了版本。我可以通过任务持续时间和最终文件大小(我在最后写入文件)看到 pid、cid、代码的一种组合明显大于其他组合。省略它们会提高性能,但我不能在生产中这样做。
  • 谢谢@EdKohlwey 你能澄清一下吗?

标签: apache-spark apache-spark-sql


【解决方案1】:

我只能推荐几种方法来缓解您的病情进行调查。我实际上会尝试两种不首先处理偏斜的方法:

  • 尝试增加每条消息的执行程序内存。在 YARN 上,您可能还需要增加最大容器内存。 Spark IIRC 上的默认值为 2gb,需要增加它并不少见。
  • 尝试切换到 memory_and_disk 或 disk_only 持久性级别。我相信这应该适用于您的查询,尽管可能很难看到完整的查询计划

这样做的原因是,至少在我看来,您的数据从根本上是有偏差的。如果您开始重塑数据以特定方式解决与当前数据形状的偏差,那么您将面临维护困难,因为数据的形状可能会随着时间而改变。在我看来,至少您希望尽可能长时间地保留最直接的查询实现,并且仅在遇到违反 SLA 等问题时​​才以编程方式优化倾斜问题。

如果这些都不起作用,那么您可以尝试直接解决偏差。 为此,一个简单的方法是创建第三列,该列由已知有问题的列值的随机数填充。使用它作为键执行一次求和操作,然后删除额外的随机列进行第二次遍历。或者,您可以执行两个查询并将它们连接起来:一个带有随机数的偏斜数据(仍必须分两次处理),另一个用于无问题数据的未更改查询。

编辑 - 通过两帧计算部分和

这里最有用的观察是加法是可交换的和关联的。我最初基于随机数的建议不起作用,但这会起作用。基本上,您想在几个部分中计算所需帧的部分总和。最简单的方法可能是作为一组范围(为了简单起见,这里使用了两个):

create temporary table partial_revenue_1 as select c.*, coalesce(
      sum(revenue)
        OVER (PARTITION BY cid, pid, code
        ORDER BY (cTime div (1000*3600))
        RANGE BETWEEN 336 PRECEDING and 118 PRECEDING), 0L) as partialTotalRevenue
  from records c

create temporary table partial_revenue_2 as select c.*, coalesce(
      sum(revenue)
        OVER (PARTITION BY cid, pid, code
        ORDER BY (cTime div (1000*3600))
        RANGE BETWEEN 117 PRECEDING and 1 PRECEDING), 0L) as partialTotalRevenue
  from records c

create temporary table combined_partials as select * from
    partial_reveneue_1 union all select * from partial_revenue_2

select sum(partialTotalRevenue), first(c.some_col) ... from 
    combined_partials c group by cid, pid, code

请注意,您需要使用 first 聚合函数来剔除您将在 records 表上的早期 select * 操作中拥有的重复字段。不用担心,这会很好,因为这两个值都来自同一个表。

【讨论】:

  • 感谢 Ed,我现在将专注于解决倾斜问题。您能请教一下您所说的“第三”栏吗?也许是一个小例子
  • LeonB 酷。您了解我为此概述的方法吗?是否需要进一步澄清?
  • 我将不胜感激。谢谢
  • 第二遍我应该使用reduceByKey,因为它先在本地工作,然后再随机播放,否则我会遇到同样的问题?
  • 使用随机列求和会干扰窗口函数,因此相关键会分开。
猜你喜欢
  • 1970-01-01
  • 2017-04-30
  • 1970-01-01
  • 1970-01-01
  • 2018-01-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多