【发布时间】:2021-05-18 06:07:53
【问题描述】:
所以我执行必要的导入等
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types._
import spark.implicits._
然后定义一些经纬度点
val london = (1.0, 1.0)
val suburbia = (2.0, 2.0)
val southampton = (3.0, 3.0)
val york = (4.0, 4.0)
然后我像这样创建一个 spark Dataframe 并检查它是否有效:
val exampleDF = Seq((List(london,suburbia),List(southampton,york)),
(List(york,london),List(southampton,suburbia))).toDF("AR1","AR2")
exampleDF.show()
数据框由以下类型组成
DataFrame = [AR1: array<struct<_1:double,_2:double>>, AR2: array<struct<_1:double,_2:double>>]
我创建了一个函数来创建点的组合
// function to do what I want
val latlongexplode = (x: Array[(Double,Double)], y: Array[(Double,Double)]) => {
for (a <- x; b <-y) yield (a,b)
}
我检查该功能是否有效
latlongexplode(Array(london,york),Array(suburbia,southampton))
确实如此。但是,在我使用此函数创建 UDF 后
// declare function into a Spark UDF
val latlongexplodeUDF = udf (latlongexplode)
当我尝试在上面创建的 spark 数据框中使用它时:
exampleDF.withColumn("latlongexplode", latlongexplodeUDF($"AR1",$"AR2")).show(false)
我得到了一个很长的堆栈跟踪,基本上可以归结为:
java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef 不能转换为 [Lscala.Tuple2;
org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f$3(ScalaUDF.scala:121) org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1063) org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:151) org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:50) org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:32) scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
如何让这个 udf 在 Scala Spark 中工作? (如果这有帮助,我目前正在使用 2.4)
编辑:可能是我构建示例 df 的方式存在问题。 但我所拥有的实际数据是每列上的纬度/经度元组的数组(大小未知)。
【问题讨论】:
-
您可能想就此联系 Raphael Roth,他似乎比大多数人都走得更远。
-
它与数组的结构方面有关,但我不知道如何解决这个问题。
-
@raphaelroth 你能评论一下吗?
-
@thebluephantom 不需要拉斐尔,我已经解决了 :)
-
@mck 感谢您的解释......以及解决方案。真的很感激。
标签: scala apache-spark user-defined-functions