【发布时间】:2019-01-31 00:53:49
【问题描述】:
我正在尝试构建一个递归重写 ArrayType 列的 spark 函数:
import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.functions._
val arrayHead = udf((sequence: Seq[String]) => sequence.head)
val arrayTail = udf((sequence: Seq[String]) => sequence.tail)
// re-produces the ArrayType column recursively
val rewriteArrayCol = (c: Column) => {
def helper(elementsRemaining: Column, outputAccum: Column): Column = {
when(size(elementsRemaining) === lit(0), outputAccum)
.otherwise(helper(arrayTail(elementsRemaining), concat(outputAccum, array(arrayHead(elementsRemaining)))))
}
helper(c, array())
}
// Test
val df =
Seq("100" -> Seq("a", "b", "b", "b", "b", "b", "c", "c", "d"))
.toDF("id", "sequence")
// .withColumn("test_tail", arrayTail($"sequence")) //head & tail udfs work
// .withColumn("test", rewriteArrayCol($"sequence")) //stackoverflow if uncommented
display(df)
不幸的是,我不断收到堆栈溢出。我认为该功能缺乏的一个领域是它不是尾递归的。即整个 'when().otherwise()' 块与 'if else' 块不同。话虽这么说,该函数目前在应用于即使是很小的数据帧时也会引发 stackoverflow(所以我认为它肯定有更多的错误,而不仅仅是不是尾递归)。
我无法在网上找到任何类似功能的示例,所以我想在这里问一下。我能找到的 Column => Column 函数的唯一实现是非常非常简单的,它们对这个用例没有帮助。
注意:我可以通过使用 UDF 来实现上述功能。我尝试创建 Column => Column 函数的原因是,与 UDF 相比,Spark 能够更好地优化这些函数(据我所知)。
【问题讨论】:
标签: scala apache-spark apache-spark-sql