【发布时间】:2021-11-17 08:34:24
【问题描述】:
我正在使用 pyspark 例程来插入配置表中的缺失值。
想象一个从 0 到 50,000 的配置值表。用户在两者之间指定几个数据点(比如 0、50、100、500、2000、500000),然后我们对剩余部分进行插值。我的解决方案主要遵循 this blog post,但我没有使用任何 UDF。
在对此性能进行故障排除(大约需要 3 分钟)时,我发现一个特定的窗口函数一直在占用所有时间,而我正在做的其他事情只需要几秒钟。
这里是主要感兴趣的区域 - 我使用窗口函数来填写上一个和下一个用户提供的配置值:
from pyspark.sql import Window, functions as F
# Create partition windows that are required to generate new rows from the ones provided
win_last = Window.partitionBy('PORT_TYPE', 'loss_process').orderBy('rank').rowsBetween(Window.unboundedPreceding, 0)
win_next = Window.partitionBy('PORT_TYPE', 'loss_process').orderBy('rank').rowsBetween(0, Window.unboundedFollowing)
# Join back in the provided config table to populate the "known" scale factors
df_part1 = (df_scale_factors_template
.join(df_users_config, ['PORT_TYPE', 'loss_process', 'rank'], 'leftouter')
# Add computed columns that can lookup the prior config and next config for each missing value
.withColumn('last_rank', F.last( F.col('rank'), ignorenulls=True).over(win_last))
.withColumn('last_sf', F.last( F.col('scale_factor'), ignorenulls=True).over(win_last))
).cache()
debug_log_dataframe(df_part1 , 'df_part1') # Force a .count() and time Part1
df_part2 = (df_part1
.withColumn('next_rank', F.first(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.first(F.col('scale_factor'), ignorenulls=True).over(win_next))
).cache()
debug_log_dataframe(df_part2 , 'df_part2') # Force a .count() and time Part2
df_part3 = (df_part2
# Implements standard linear interpolation: y = y1 + ((y2-y1)/(x2-x1)) * (x-x1)
.withColumn('scale_factor',
F.when(F.col('last_rank')==F.col('next_rank'), F.col('last_sf')) # Handle div/0 case
.otherwise(F.col('last_sf') + ((F.col('next_sf')-F.col('last_sf'))/(F.col('next_rank')-F.col('last_rank'))) * (F.col('rank')-F.col('last_rank'))))
.select('PORT_TYPE', 'loss_process', 'rank', 'scale_factor')
).cache()
debug_log_dataframe(df_part3, 'df_part3', explain: True) # Force a .count() and time Part3
上面曾经是一个单链数据框语句,但我已经将它分成 3 个部分,以便我可以隔离需要这么长时间的部分。结果是:
Part 1: Generated 8 columns and 300006 rows in 0.65 secondsPart 2: Generated 10 columns and 300006 rows in 189.55 secondsPart 3: Generated 4 columns and 300006 rows in 0.24 seconds
为什么我对first() 的调用比Window.unboundedFollowing 比last() 比Window.unboundedPreceding 花费的时间长这么多?
一些避免问题/疑虑的注意事项:
-
debug_log_dataframe只是一个辅助函数,用于强制使用.Count()执行数据帧/缓存并计时以产生上述日志。 - 我们实际上是同时操作 6 个 50001 行的配置表(因此需要分区和行数)
- 作为一个健全的检查,我已经通过明确的
unpersist()ing 在安排后续运行之前排除了cache()重用的影响 - 我对上述测量结果非常有信心。
物理计划:
为了帮助回答这个问题,我在 part3 的结果上致电 explain() 以确认缓存正在产生预期的效果。此处注释以突出问题区域:
我能看到的唯一区别是:
- 前两个调用(对
last)显示RunningWindowFunction,而对next的调用只显示Window - 第 1 部分旁边有一个 *(3),但第 2 部分没有。
我尝试过的一些事情:
- 我尝试将第 2 部分进一步拆分为单独的数据帧 - 结果是每个
first语句占用总时间的一半(约 98 秒) - 我尝试颠倒生成这些列的顺序(例如,在调用 'first' 之后调用 'last'),但没有区别。无论哪个数据帧最终包含对
first的调用都是慢的。
我觉得我已经尽我所能地进行了挖掘,并且有点希望火花专家能够看一看知道这个时间来自哪里。
【问题讨论】:
-
我听说不止一个 DBA 发表了“避免使用 OLAP 函数,它们带来的麻烦多于其价值”的笼统声明,虽然我并不总是遵循这个建议,看到这样的东西肯定有助于我了解它们的来源。
-
顺便说一下,Spark 3.1.1,任何想知道这是否是版本问题的人。
-
我创建了issues.apache.org/jira/browse/SPARK-36844 是因为这是无意的行为。
标签: performance apache-spark pyspark apache-spark-sql window-functions