【发布时间】:2019-10-15 16:40:09
【问题描述】:
我正在尝试比较数据框中的两列并找出值的变化率。 我编写了一个 UDF 来实现这一点,但在执行时出错。
下面是dataframe中的数据结构。
+------------+-------------+-----------+------+
| NUM_ID | TIME |PREVIOUS_SG1|SG1_V|
+------------+-------------+-----------+------+
|XXXXX01 |1570167499000| null |79.0 |
|XXXXX01 |1570167502000| 79.0 |88.0 |
|XXXXX01 |1570167503000| 88.0 |99.0 |
|XXXXX01 |1570179810000| 99.0 |null |
|XXXXX01 |1570179811000| null |100.0 |
以下是此数据框的架构。
scala> castDF.printSchema
root
|-- NUM_ID: string (nullable = true)
|-- TIME: long (nullable = true)
|-- PREVIOUS_SG1: double (nullable = true)
|-- SG1_V: double (nullable = true)
下面是编写的UDF。
def UDF_D:UserDefinedFunction=udf((PREV: Double,CURR: Double)=>{
if(PREV != null || PREV !=0){
val out = ((CURR-PREV)/PREV)*100
out
}})
以及调用 UDF 的 scala 代码
val diffDF = castDF.withColumn("SG1_DIFF", (UDF_D(col("PREVIOUS_SG1"),col("SG1_V"))))
执行时出现错误。
scala> val diffDF = castDF.withColumn("SG1_DIFF", (UDF_D(col("PREVIOUS_SG1"),col("SG1_V"))))
java.lang.UnsupportedOperationException: Schema for type AnyVal is not supported
是否需要进行任何强制转换来调用 UDF 或空值是否会导致问题?我希望我传递的是 Double 值,而不是处理任何其他类型。
【问题讨论】:
-
如果您解释问题中的解决方案/错误,这对我非常有帮助,因为我是 scala 和 DF 技术的新手,在投反对票之前。
-
请分享预期输出
-
@Antony,你给了我们预期的输出吗?我非常怀疑您的 udf 并非每次都返回一个值,这导致它失败。
标签: scala dataframe apache-spark apache-spark-sql