【发布时间】:2018-07-25 11:46:49
【问题描述】:
我一直认为 Spark 不允许定义 User-Defined-Window-Functions。我刚刚从这里 (https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html) 测试了“几何平均值”UDAF 示例作为窗口函数,它似乎工作得很好,例如:
val geomMean = new GeometricMean
(1 to 10).map(i=>
(i,i.toDouble)
)
.toDF("i","x")
.withColumn("geom_mean",geomMean($"x").over(Window.orderBy($"i").rowsBetween(-1,1)))
.show()
+---+----+------------------+
| i| x| geom_mean|
+---+----+------------------+
| 1| 1.0|1.4142135623730951|
| 2| 2.0|1.8171205928321397|
| 3| 3.0|2.8844991406148166|
| 4| 4.0|3.9148676411688634|
| 5| 5.0| 4.93242414866094|
| 6| 6.0| 5.943921952763129|
| 7| 7.0| 6.952053289772898|
| 8| 8.0| 7.958114415792783|
| 9| 9.0| 8.962809493114328|
| 10|10.0| 9.486832980505138|
+---+----+------------------+
我从未见过 spark 文档谈论使用 UDAF 作为窗口函数。这是否允许,即结果是否正确?顺便说一下,我使用的是 spark 2.1
编辑:
让我感到困惑的是,在标准聚合中(即后跟 groupBy),数据总是添加到缓冲区中,即它们总是会增长,不会缩小。使用窗口功能(尤其是与rowsBetween() 结合使用),数据也需要从缓冲区中删除,因为“旧”元素会随着排序定义的行移动而退出窗口。我认为窗口函数可以随着状态的顺序移动。所以我认为必须有类似“删除”的方法来实现
【问题讨论】:
-
你能帮我修改代码吗?因为我使用的是 spark2.3,并且我已经复制了您的代码,但现在不允许我像 geommean($"x") 那样传递 col("x"),它说:
org.apache.commons.math3.stat.descriptive.moment.GeometricMean does not take parametersbloop -
@Lingaraj 您似乎必须错误导入。 GeometricMean 不是来自 apache commons math,但您必须自己定义它(此处给出的代码:docs.databricks.com/spark/latest/spark-sql/udaf-scala.html)
-
哦,好的,非常感谢!
标签: scala apache-spark dataframe user-defined-aggregate