【问题标题】:improvement of a snippet of scala/spark code to improve performance改进一段 scala/spark 代码以提高性能
【发布时间】:2021-01-12 06:58:32
【问题描述】:

我正在尝试计算某列的平均值并将其保存为新列,以下是我实现此目的的代码 sn-p:

df = df.withColumn("avg_colname", lit(df.select(avg("colname").as("temp")).first().getAs("temp")))

总共需要计算 8 列。在使用“spark-submit”命令的小型 3 节点集群上,代码执行所花费的时间比使用“spark-shell”命令的单台机器上要多得多(几分钟对几秒钟)。

为什么代码在集群上执行比在单机上慢,上面的sn-p代码如何改进?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    您可以使用窗口函数重写您的代码:

    df = df.withColumn("avg_colname", avg("colname").over())
    // add other columns
    

    或者以其他方式加入平均值:

    df = df.crossJoin(broadcast(
       df.agg(
          avg("colname").as("avg_colname")
          // add other columns
        )
     ))
    

    这两种方法应该给出相同的结果,但它们的工作方式不同:窗口函数会将所有数据移动到 1 个分区,而第二种方法将使用部分聚合,并且可以更好地扩展大型数据集

    您也可以尝试缓存初始数据帧并检查这是否有帮助。另请注意,如果您的数据很小,分布式计算只会增加开销并使其变慢。如果您知道数据很小,最好是使用 1 台机器和 1 个分区...

    【讨论】:

    • 谢谢,第一种方法解决了我的问题(自从第一种方法有效以来,还没有测试过第二种方法)。我还有两个问题。 1.您能否参考有关“将所有数据移动到 1 个分区”行为的源/文档? 2.一个数据集有多大才算大(以行计)?
    【解决方案2】:

    如果数据很小,我同意 Raphael 将其保留为 1 台机器和 1 个分区。 此外,我想添加我的答案,让您更深入地了解幕后发生的事情。

    您询问的代码具有如下所示的解释计划(注意:我仅显示平均值的计算,列的添加是惰性转换,因此不会触发实际计算):

    scala> df.select(avg("colname").as("temp")).explain
    
    == Physical Plan ==
    *HashAggregate(keys=[], functions=[avg(cast(col_2#6 as bigint))])
    +- Exchange SinglePartition
       +- *HashAggregate(keys=[], functions=[partial_avg(cast(col_2#6 as bigint))])
          +- LocalTableScan [col_2#6]
    

    如上面的代码 sn-p 所示,为了计算平均值,会发生一个交换。

    这回答了为什么您在使用 3 节点集群时会看到速度变慢的问题。交换会导致数据在不同分区之间发生网络混洗,从而导致速度变慢(网络操作比内存或缓存操作慢得多)。

    要回答关于如何改进它的第二个问题,将取决于您的应用程序的详细信息。您不能对代码进行太多更改,因为您仍然需要计算平均值。但是您可以查看以下两件事:

    1. 如果可能,请尝试过滤您的数据。要洗牌的数据越少,洗牌时间就越短。过滤器将被下推,因此它将在随机播放之前执行。
    2. 查看集群中可能存在的偏差。这可以通过查看 spark ui 找到。由于节点或网络速度较慢而导致的偏差可能会导致整个应用程序变慢。换句话说,您的应用程序将与最慢的节点一样快。

    我会推荐上面的1。在这种情况下,它通常是唾手可得的成果,如果您了解自己的数据及其用例,实现起来会容易得多。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-06-18
      • 2020-07-20
      • 1970-01-01
      • 1970-01-01
      • 2020-12-31
      相关资源
      最近更新 更多