【发布时间】:2020-05-13 01:24:14
【问题描述】:
我有一个非常简单的 pyspark SQL 应用程序(spark 2.4.4,EMR 5.29),它读取模式主题、年份、计数的数据框:
df.show()
+--------+----+------+
| topic|year| count|
+--------+----+------+
|covid-19|2017|606498|
|covid-19|2016|454678|
|covid-19|2011| 10517|
|covid-19|2008| 6193|
|covid-19|2015|510391|
|covid-19|2013| 29551|
然后我需要按年份排序并将计数收集到一个列表中,以便它们按年份按升序排列:
df.orderBy('year').groupBy('topic').agg(collect_list('count').alias('counts'))
问题是,由于我按年排序,因此用于此阶段的分区数是我数据集中的年数。因此,我遇到了一个疯狂的瓶颈阶段,其中使用了 300 个执行程序中的 15 个,导致明显的内存溢出和磁盘溢出,最终由于设备上没有空间用于过度填充的分区而导致该阶段失败。
更有趣的是,我找到了一种规避此问题的方法,这种方法在直觉上看起来效率要低得多,但实际上确实有效,因为不会产生瓶颈:
df.groupBy('topic').pivot('year', values=range(START, FINISH)).agg(first('count')) \
.select('topic', array([col(c) for c in range(START, FINISH)]).alias('counts'))
这导致了我想要的输出,这是一个按年份排序的计数数组。
任何人解释或知道为什么会发生这种情况,或者如何最好地防止这种情况? 我发现 this answer which 和 this jira 基本上建议在排序中“添加噪音”以避免这些与倾斜相关的问题。
我认为值得一提的是,枢轴方法比添加噪声具有更好的分辨率,并且据我所知,每当按具有小范围值的列进行排序时。将不胜感激有关此实现和替代实现的任何信息。
【问题讨论】:
标签: apache-spark pyspark apache-spark-sql