【发布时间】:2017-05-17 16:00:20
【问题描述】:
一个函数应该针对数据框中的多个列执行
def handleBias(df: DataFrame, colName: String, target: String = target) = {
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
df.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
.drop("cnt_group")
}
这可以用 spark-SQL 和 for 循环很好地编写,如上所示。然而,这导致了很多洗牌 (spark apply function to columns in parallel)。
一个最小的例子:
val df = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
val columnsToDrop = Seq("col3TooMany")
val columnsToCode = Seq("col1", "col2")
val target = "TARGET"
val targetCounts = df.filter(df(target) === 1).groupBy(target)
.agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")
val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) {
(currentDF, colName) => handleBias(currentDF, colName)
}
result.drop(columnsToDrop: _*).show
如何使用 RDD API 更有效地制定这个? aggregateByKey应该是一个好主意,但我仍然不太清楚如何在此处应用它来替换窗口函数。
(提供更多上下文/更大的示例https://github.com/geoHeil/sparkContrastCoding)
编辑
最初,我从Spark dynamic DAG is a lot slower and different from hard coded DAG 开始,如下所示。好消息是,每列似乎都是独立/并行运行的。缺点是连接(即使对于 300 MB 的小数据集)变得“太大”并导致无响应的火花。
handleBiasOriginal("col1", df)
.join(handleBiasOriginal("col2", df), df.columns)
.join(handleBiasOriginal("col3TooMany", df), df.columns)
.drop(columnsToDrop: _*).show
def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = {
val pre1_1 = df
.filter(df(target) === 1)
.groupBy(col, target)
.agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
.drop(target)
val pre2_1 = df
.groupBy(col)
.agg(mean(target).alias("pre2_" + col))
df
.join(pre1_1, Seq(col), "left")
.join(pre2_1, Seq(col), "left")
.na.fill(0)
}
此图片使用 spark 2.1.0,来自Spark dynamic DAG is a lot slower and different from hard coded DAG 的图片使用 2.0.2
应用缓存时,DAG 会更简单一些 df.cache handleBiasOriginal("col1", df)。 ...
除了窗口函数之外,您还看到了哪些优化 SQL 的可能性? 最好是动态生成 SQL。
【问题讨论】:
标签: scala apache-spark apache-spark-sql rdd