【问题标题】:How to optimize window aggregation over large windows?如何优化大窗口上的窗口聚合?
【发布时间】:2020-01-15 10:14:59
【问题描述】:

我在 spark 2.4.4 中使用带有大窗口的窗口函数,例如。

Window
  .partitionBy("id")
  .orderBy("timestamp")

在我的测试中,我有大约 70 个不同的 ID,但我可能有大约 200 000 行的 ID。如果没有进一步的配置,我必须为我的执行程序分配大量内存以避免这种 OOM:

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory, got 0
at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:161)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:128)
at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(ExternalAppendOnlyUnsafeRowArray.scala:115)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextPartition(WindowExec.scala:345)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.next(WindowExec.scala:371)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.next(WindowExec.scala:303)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:631)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextRow(WindowExec.scala:314)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.<init>(WindowExec.scala:323)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:303)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:302)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

查看源码,发现了这个参数,完全没有文档记录:

spark.sql.windowExec.buffer.in.memory.threshold

给它一个大尺寸(例如 1.000.000),我不再需要那么多内存了。据我了解,这是缓冲的行数;我猜想增加这个参数不会重复执行器内存中的行,但这对我来说并不是很清楚。

有人能准确解释一下执行者端是如何处理窗口的吗?为什么数据会重复?如何避免这种重复并使过程更快,每个窗口中有很多行?可以使用哪些参数?

谢谢。

【问题讨论】:

  • 也遇到了这个问题。你找到解决方案了吗?

标签: scala apache-spark apache-spark-sql


【解决方案1】:

我发现了这个参数,根本没有记录:

这是一个内部配置属性。

通过阅读源代码,我设法“收集”了以下内容:

spark.sql.windowExec.buffer.in.memory.threshold(内部)保证在内存中保存的行数阈值 WindowExec物理操作员。

默认:4096

使用SQLConf.windowExecBufferInMemoryThreshold方法访问当前 价值。

说到WindowExec操作符的内部属性,你可能还需要另一个性能调优:

spark.sql.windowExec.buffer.spill.threshold(内部)WindowExec 物理运算符中缓冲的行数阈值。

默认:4096

使用SQLConf.windowExecBufferSpillThreshold方法访问当前值。

唉,我无法完全解释内部原理。

【讨论】:

    猜你喜欢
    • 2020-09-09
    • 1970-01-01
    • 2022-08-13
    • 2021-12-11
    • 2021-09-05
    • 1970-01-01
    • 2022-08-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多