【问题标题】:scala spark UDF ClassCastException : WrappedArray$ofRef cannot be cast to [Lscala.Tuple2scala spark UDF ClassCastException:WrappedArray$ofRef 无法转换为 [Lscala.Tuple2
【发布时间】:2021-05-18 06:07:53
【问题描述】:

所以我执行必要的导入等

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types._
import spark.implicits._

然后定义一些经纬度点

val london = (1.0, 1.0)
val suburbia = (2.0, 2.0)
val southampton = (3.0, 3.0)  
val york = (4.0, 4.0)  

然后我像这样创建一个 spark Dataframe 并检查它是否有效:

val exampleDF = Seq((List(london,suburbia),List(southampton,york)),
    (List(york,london),List(southampton,suburbia))).toDF("AR1","AR2")
exampleDF.show()

数据框由以下类型组成

DataFrame = [AR1: array<struct<_1:double,_2:double>>, AR2: array<struct<_1:double,_2:double>>]

我创建了一个函数来创建点的组合

// function to do what I want
val latlongexplode =  (x: Array[(Double,Double)], y: Array[(Double,Double)]) => {
 for (a <- x; b <-y) yield (a,b)
}

我检查该功能是否有效

latlongexplode(Array(london,york),Array(suburbia,southampton))

确实如此。但是,在我使用此函数创建 UDF 后

// declare function into a Spark UDF
val latlongexplodeUDF = udf (latlongexplode) 

当我尝试在上面创建的 spark 数据框中使用它时:

exampleDF.withColumn("latlongexplode", latlongexplodeUDF($"AR1",$"AR2")).show(false)

我得到了一个很长的堆栈跟踪,基本上可以归结为:

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef 不能转换为 [Lscala.Tuple2;
org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f$3(ScalaUDF.scala:121) org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1063) org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:151) org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:50) org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:32) scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:27​​3)

如何让这个 udf 在 Scala Spark 中工作? (如果这有帮助,我目前正在使用 2.4)

编辑:可能是我构建示例 df 的方式存在问题。 但我所拥有的实际数据是每列上的纬度/经度元组的数组(大小未知)。

【问题讨论】:

  • 您可能想就此联系 Raphael Roth,他似乎比大多数人都走得更远。
  • 它与数组的结构方面有关,但我不知道如何解决这个问题。
  • @raphaelroth 你能评论一下吗?
  • @thebluephantom 不需要拉斐尔,我已经解决了 :)
  • @mck 感谢您的解释......以及解决方案。真的很感激。

标签: scala apache-spark user-defined-functions


【解决方案1】:

在 UDF 中使用结构类型时,它们表示为 Row 对象,而数组列表示为 Seq。另外,你需要以Row的形式返回一个struct,并且你需要定义一个schema来返回一个struct。

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val london = (1.0, 1.0)
val suburbia = (2.0, 2.0)
val southampton = (3.0, 3.0)  
val york = (4.0, 4.0)
val exampleDF = Seq((List(london,suburbia),List(southampton,york)),
    (List(york,london),List(southampton,suburbia))).toDF("AR1","AR2")
exampleDF.show(false)
+------------------------+------------------------+
|AR1                     |AR2                     |
+------------------------+------------------------+
|[[1.0, 1.0], [2.0, 2.0]]|[[3.0, 3.0], [4.0, 4.0]]|
|[[4.0, 4.0], [1.0, 1.0]]|[[3.0, 3.0], [2.0, 2.0]]|
+------------------------+------------------------+
val latlongexplode = (x: Seq[Row], y: Seq[Row]) => {
    for (a <- x; b <- y) yield Row(a, b)
}

val udf_schema = ArrayType(
    StructType(Seq(
        StructField(
            "city1",
            StructType(Seq(
                StructField("lat", FloatType),
                StructField("long", FloatType)
            ))
        ),
        StructField(
            "city2",
            StructType(Seq(
                StructField("lat", FloatType),
                StructField("long", FloatType)
            ))
        )
    ))
)

// include this line if you see errors like 
// "You're using untyped Scala UDF, which does not have the input type information."
// spark.sql("set spark.sql.legacy.allowUntypedScalaUDF = true")

val latlongexplodeUDF = udf(latlongexplode, udf_schema)
result = exampleDF.withColumn("latlongexplode", latlongexplodeUDF($"AR1",$"AR2"))
result.show(false)
+------------------------+------------------------+--------------------------------------------------------------------------------------------------------+
|AR1                     |AR2                     |latlongexplode                                                                                          |
+------------------------+------------------------+--------------------------------------------------------------------------------------------------------+
|[[1.0, 1.0], [2.0, 2.0]]|[[3.0, 3.0], [4.0, 4.0]]|[[[1.0, 1.0], [3.0, 3.0]], [[1.0, 1.0], [4.0, 4.0]], [[2.0, 2.0], [3.0, 3.0]], [[2.0, 2.0], [4.0, 4.0]]]|
|[[4.0, 4.0], [1.0, 1.0]]|[[3.0, 3.0], [2.0, 2.0]]|[[[4.0, 4.0], [3.0, 3.0]], [[4.0, 4.0], [2.0, 2.0]], [[1.0, 1.0], [3.0, 3.0]], [[1.0, 1.0], [2.0, 2.0]]]|
+------------------------+------------------------+--------------------------------------------------------------------------------------------------------+

【讨论】:

  • 印象深刻,我快到了。明天试试。
  • 刚看到。我发现这些错误消息很难理解。
  • 我原以为需要一个案例类。你去吧。
  • @thebluephantom 是的,我猜 case 类可能会更好 - 不推荐使用定义 udf 模式。但是要定义一个案例类的结构似乎有点复杂,所以我选择了 udf 模式。 OP 无论如何都在使用 spark 2.4,所以弃用不是问题。
  • @mck + thebluephantom 非常感谢你们!我与 Mamonu 合作开发了一款名为 Splink 的开源数据链接软件,该软件使用 Spark,这非常有用!
猜你喜欢
  • 1970-01-01
  • 2015-10-20
  • 2020-12-01
  • 2021-02-23
  • 2020-09-01
  • 1970-01-01
  • 1970-01-01
  • 2018-11-22
  • 2021-10-26
相关资源
最近更新 更多