【问题标题】:Spark migrate sql window function to RDD for better performanceSpark 将 sql 窗口函数迁移到 RDD 以获得更好的性能
【发布时间】:2017-05-17 16:00:20
【问题描述】:

一个函数应该针对数据框中的多个列执行

def handleBias(df: DataFrame, colName: String, target: String = target) = {
    val w1 = Window.partitionBy(colName)
    val w2 = Window.partitionBy(colName, target)

    df.withColumn("cnt_group", count("*").over(w2))
      .withColumn("pre2_" + colName, mean(target).over(w1))
      .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
      .drop("cnt_group")
  }

这可以用 spark-SQL 和 for 循环很好地编写,如上所示。然而,这导致了很多洗牌 (spark apply function to columns in parallel)。

一个最小的例子:

  val df = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  ).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

  val columnsToDrop = Seq("col3TooMany")
  val columnsToCode = Seq("col1", "col2")
  val target = "TARGET"

  val targetCounts = df.filter(df(target) === 1).groupBy(target)
    .agg(count(target).as("cnt_foo_eq_1"))
  val newDF = df.join(broadcast(targetCounts), Seq(target), "left")

  val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) {
    (currentDF, colName) => handleBias(currentDF, colName)
  }

  result.drop(columnsToDrop: _*).show

如何使用 RDD API 更有效地制定这个? aggregateByKey应该是一个好主意,但我仍然不太清楚如何在此处应用它来替换窗口函数。

(提供更多上下文/更大的示例https://github.com/geoHeil/sparkContrastCoding

编辑

最初,我从Spark dynamic DAG is a lot slower and different from hard coded DAG 开始,如下所示。好消息是,每列似乎都是独立/并行运行的。缺点是连接(即使对于 300 MB 的小数据集)变得“太大”并导致无响应的火花。

handleBiasOriginal("col1", df)
    .join(handleBiasOriginal("col2", df), df.columns)
    .join(handleBiasOriginal("col3TooMany", df), df.columns)
    .drop(columnsToDrop: _*).show

  def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = {
    val pre1_1 = df
      .filter(df(target) === 1)
      .groupBy(col, target)
      .agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
      .drop(target)

    val pre2_1 = df
      .groupBy(col)
      .agg(mean(target).alias("pre2_" + col))

    df
      .join(pre1_1, Seq(col), "left")
      .join(pre2_1, Seq(col), "left")
      .na.fill(0)
  }

此图片使用 spark 2.1.0,来自Spark dynamic DAG is a lot slower and different from hard coded DAG 的图片使用 2.0.2

应用缓存时,DAG 会更简单一些 df.cache handleBiasOriginal("col1", df)。 ...

除了窗口函数之外,您还看到了哪些优化 SQL 的可能性? 最好是动态生成 SQL。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql rdd


    【解决方案1】:

    这里的重点是避免不必要的洗牌。现在,您的代码会为您想要包含的每一列洗牌两次,并且无法在列之间重用生成的数据布局。

    为简单起见,我假设 target 始终是二进制 ({0, 1}) 并且您使用的所有剩余列都是 StringType。此外,我假设列的基数足够低,可以在本地对结果进行分组和处理。您可以调整这些方法来处理其他情况,但这需要更多的工作。

    RDD API

    • 从宽到长重塑数据:

      import org.apache.spark.sql.functions._
      
      val exploded = explode(array(
        (columnsToDrop ++ columnsToCode).map(c => 
          struct(lit(c).alias("k"), col(c).alias("v"))): _*
      )).alias("level")
      
      val long = df.select(exploded, $"TARGET")
      
    • aggregateByKey,重塑收藏:

      import org.apache.spark.util.StatCounter
      
      val lookup = long.as[((String, String), Int)].rdd
        // You can use prefix partitioner (one that depends only on _._1)
        // to avoid reshuffling for groupByKey
        .aggregateByKey(StatCounter())(_ merge _, _ merge _)
        .map { case ((c, v), s) => (c, (v, s)) }
        .groupByKey
        .mapValues(_.toMap)
        .collectAsMap
      
    • 您可以使用lookup 获取各个列和级别的统计信息。例如:

      lookup("col1")("A")
      
      org.apache.spark.util.StatCounter = 
        (count: 3, mean: 0.666667, stdev: 0.471405, max: 1.000000, min: 0.000000)
      

      为您提供col1 级别A 的数据。基于二进制 TARGET 假设,此信息是完整的(您可以获得两个类的计数/分数)。

      您可以使用这样的查找来生成 SQL 表达式或将其传递给 udf 并将其应用于各个列。

    数据帧 API

    • 将数据转换为 RDD API 的 long。
    • 根据级别计算聚合:

      val stats = long
        .groupBy($"level.k", $"level.v")
        .agg(mean($"TARGET"), sum($"TARGET"))
      
    • 根据您的喜好,您可以重新调整它以实现高效连接或转换为本地集合,类似于 RDD 解决方案。

    【讨论】:

      【解决方案2】:

      使用 aggregateByKey 关于aggregateByKey 的简单解释可以在here 找到。基本上你使用两个函数:一个在分区内工作,一个在分区之间工作。

      您需要按第一列进行聚合,并在内部为第二列的每个元素构建一个带有映射的数据结构,以便在那里聚合和收集数据(当然,如果需要,您可以执行两个 aggregateByKey)。 这不会解决对您要使用的每一列的代码进行多次运行的情况(您可以使用聚合而不是聚合ByKey 来处理所有数据并将其放入地图中,但这可能会让您更糟表现)。结果将是每个键一行,如果您想移回原始记录(就像窗口函数一样),您实际上需要将此值与原始 RDD 连接或在内部保存所有值和平面图

      我认为这不会为您带来任何真正的性能改进。您将做大量工作来重新实现在 SQL 中为您完成的事情,同时您将失去 SQL 的大部分优势(催化剂优化、钨内存管理、全阶段代码生成等)

      改进 SQL

      我会做的是尝试改进 SQL 本身。 例如,窗口函数中列的结果对于所有值似乎都是相同的。你真的需要一个窗口函数吗?您可以改为使用 groupBy 而不是窗口函数(如果您真的需要每条记录都需要这个,您可以尝试加入结果。这可能会提供更好的性能,因为它不一定意味着在每一步都洗牌两次)。

      【讨论】:

      • 请参阅stackoverflow.com/questions/41169873/… 以及我上面的编辑。最初,我开始使用带有连接的 group-by。这导致工作没有在合理的时间内完成/spar 似乎没有执行任何操作。虽然连接解决方​​案适用于小数据,但我无法让它与许多列一起使用。期待有关如何改进 SQL 的建议。
      • 我并不是说加入一定是解决方案。我的意思是,在大多数情况下,带有 aggregateByKey 的 RDD 会更慢。您可以继续尝试使用我显示的链接以及如何实现它的基本逻辑来使用 aggregateByKey。
      • 同时,您是否看到了一种不使用慢速窗口函数但仍阻止使用连接的方法?
      • 您显示的链接也与血统的构建方式有关。我会尝试解决这个问题,而不是去 aggregateByKey
      • 主要问题是你想做什么。例如,您需要每条记录的列值还是只需要最终值?如果您需要列值,可以使用 groupby。
      猜你喜欢
      • 2021-05-03
      • 1970-01-01
      • 1970-01-01
      • 2021-04-17
      • 2015-01-08
      • 1970-01-01
      • 1970-01-01
      • 2014-11-19
      • 1970-01-01
      相关资源
      最近更新 更多