【发布时间】:2018-01-02 19:02:44
【问题描述】:
我有以下架构,我想添加一个名为距离的新列。此列计算每行的两个时间序列之间的距离:time_series1 和 time_series2
|-- websites: struct (nullable = true)
| |-- _1: integer (nullable = false)
| |-- _2: integer (nullable = false)
|-- countryId1: integer (nullable = false)
|-- countryId2: integer (nullable = false)
|-- time_series1: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: float (nullable = false)
| | |-- _2: date (nullable = true)
|-- time_series2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: float (nullable = false)
| | |-- _2: date (nullable = true)
所以我使用 udf 函数将这个新列定义为:
val step2= step1
.withColumn("distance", distanceUDF(col("time_series1"),col("time_series2")))
.select("websites","countryId1","countryId2","time_series1","time_series2","distance")
和 UDF:
val distanceUDF = udf( (ts1:Seq[(Float,_)], ts2:Seq[(Float,_)])=>
compute_distance( ts1.map(_._1) , ts2.map(_._1)))
但我在映射上有问题,我不知道如何将数组 (struct (float,date).to 映射到 scala。
Seq[(Float,Date)] 是否等同于 array( struct (float,date)) ?
我有以下例外:
java.lang.ClassCastException: .GenericRowWithSchema cannot be cast to scala.Tuple2
我的问题与这里暴露的 Spark Sql UDF with complex input parameter 不同。我有一个带日期的有序时间序列(我有一个数组,而不仅仅是一个结构类型)
【问题讨论】:
标签: scala apache-spark struct user-defined-functions