【问题标题】:Can every Spark UDAF be used with Window?每个 Spark UDAF 都可以与 Window 一起使用吗?
【发布时间】:2018-07-25 11:46:49
【问题描述】:

我一直认为 Spark 不允许定义 User-Defined-Window-Functions。我刚刚从这里 (https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html) 测试了“几何平均值”UDAF 示例作为窗口函数,它似乎工作得很好,例如:

val geomMean = new GeometricMean

(1 to 10).map(i=>
  (i,i.toDouble)
)
.toDF("i","x")
.withColumn("geom_mean",geomMean($"x").over(Window.orderBy($"i").rowsBetween(-1,1)))
.show()

+---+----+------------------+
|  i|   x|         geom_mean|
+---+----+------------------+
|  1| 1.0|1.4142135623730951|
|  2| 2.0|1.8171205928321397|
|  3| 3.0|2.8844991406148166|
|  4| 4.0|3.9148676411688634|
|  5| 5.0|  4.93242414866094|
|  6| 6.0| 5.943921952763129|
|  7| 7.0| 6.952053289772898|
|  8| 8.0| 7.958114415792783|
|  9| 9.0| 8.962809493114328|
| 10|10.0| 9.486832980505138|
+---+----+------------------+

我从未见过 spark 文档谈论使用 UDAF 作为窗口函数。这是否允许,即结果是否正确?顺便说一下,我使用的是 spark 2.1

编辑:

让我感到困惑的是,在标准聚合中(即后跟 groupBy),数据总是添加到缓冲区中,即它们总是会增长,不会缩小。使用窗口功能(尤其是与rowsBetween() 结合使用),数据也需要从缓冲区中删除,因为“旧”元素会随着排序定义的行移动而退出窗口。我认为窗口函数可以随着状态的顺序移动。所以我认为必须有类似“删除”的方法来实现

【问题讨论】:

  • 你能帮我修改代码吗?因为我使用的是 spark2.3,并且我已经复制了您的代码,但现在不允许我像 geommean($"x") 那样传递 col("x"),它说:org.apache.commons.math3.stat.descriptive.moment.GeometricMean does not take parametersbloop
  • @Lingaraj 您似乎必须错误导入。 GeometricMean 不是来自 apache commons math,但您必须自己定义它(此处给出的代码:docs.databricks.com/spark/latest/spark-sql/udaf-scala.html
  • 哦,好的,非常感谢!

标签: scala apache-spark dataframe user-defined-aggregate


【解决方案1】:

我不确定您的问题到底是什么。

每个 Spark UDAF 都可以与 Window 一起使用吗?

是的

这是我在这个话题上的个人经验:

我最近一直在使用 Spark window functionsUDAFs (Spark 2.0.1),我确认它们可以很好地协同工作。结果是正确的(假设您的 UDAF 编写正确)。 UDAF 写起来有点麻烦,但是一旦你得到它,下一个它就会很快。

我没有测试所有这些,但来自org.apache.spark.sql.functions._ 的内置聚合函数也对我有用。在functions 中搜索聚合。我主要与一些经典聚合器合作,例如 sumcountavgstddev,它们都返回了正确的值。

【讨论】:

  • 让我感到困惑的是,我一直认为窗口函数还需要实现从缓冲区中删除元素的函数(想想使用rowsBetween(-1,1) 的移动/滚动平均值)。但显然情况并非如此
  • 这种事情在UDAF的实现中得到了处理,你必须定义你的缓冲区,如何更新它,如何合并和其他东西......然后是背后的引擎class UserDefinedAggregateFunction 知道如何处理给定范围或行的缓冲区。
  • 是的,我知道要实现哪种方法,但正如我所说,我希望窗口函数有一个定义如何删除元素的方法。将collect_list(..)rowsBetween(-1,1) 结合使用。随着窗口向前移动,我预计最旧的元素会被删除并添加新元素。但我现在很确定情况并非如此,collect_list 似乎从头开始为数据帧中的每一行建立缓冲区,尽管这似乎效率很低
  • 抱歉,我们可能混淆了术语。窗口函数是什么意思?对我来说,在 Spark 中有一个 Window+aggregator 应用于窗口,这就是我所说的窗口函数。我从未遇到过官方术语“用户定义的窗口函数”。您问每个 Spark UDAF 都可以与 Window 一起使用吗? 答案是肯定的 :)
  • 我认为我们的意思是一样的,很抱歉我无法解释我的意思。
猜你喜欢
  • 2017-06-02
  • 2021-01-13
  • 2021-12-05
  • 1970-01-01
  • 2016-10-05
  • 1970-01-01
  • 1970-01-01
  • 2011-08-15
  • 2018-02-16
相关资源
最近更新 更多