【问题标题】:avoid shuffling and long plans on multiple joins in pyspark避免在 pyspark 中对多个连接进行洗牌和长期计划
【发布时间】:2020-04-21 19:10:33
【问题描述】:

我正在使用同一个数据框进行多次连接 我加入的数据帧是我原始数据帧上分组的结果。

    listOfCols = ["a","b","c",....]
    for c in listOfCols:
        means=df.groupby(col(c)).agg(mean(target).alias(f"{c}_mean_encoding"))
        df=df.join(means,c,how="left")

这段代码产生了超过 100000 个任务并且需要很长时间才能完成。 我看到在 dag 发生了很多洗牌。 如何优化这段代码?

【问题讨论】:

  • 这真的取决于 listOfCols 的大小,尽管您可以尝试使用 persist 每次 N 迭代保存 df
  • 这个列表非常大,大约 10 列,有些是高基数字段。坚持将如何帮助?缓存较小的分组数据框没有帮助
  • 这将有助于将中间结果保存到磁盘并简化执行计划,Spark 不会评估持久化的部分,请查看讨论here
  • 这和 checkpoint() 所做的不一样吗?无论如何,这两个选项也需要很长时间,我猜洗牌部分是真正沉重的部分。 spark sql中直接使用sql join会更优化吗?

标签: apache-spark pyspark apache-spark-sql pyspark-dataframes


【解决方案1】:

好吧,经过多次尝试和失败,我想出了最快的解决方案。 而不是这项工作的 1.5 小时,它运行了 5 分钟.... 我会把它放在这里,所以如果有人偶然发现它 - 他/她不会像我一样受苦...... 解决方案是使用 spark sql ,它必须在内部比使用数据框 API 优化得多:

df.registerTempTable("df")
for c in listOfCols:
    left_join_string  += f" left join means_{c} on df.{c} = means_{c}.{c}"
    means = df.groupby(F.col(c)).agg(F.mean(target).alias(f"{c}_mean_encoding"))
    means.registerTempTable(f"means_{c}")

df = sqlContext.sql("SELECT * FROM df "+left_join_string)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-28
    • 1970-01-01
    • 2020-02-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多