【问题标题】:Compare two columns in a dataframe and find the rate of change of values比较数据框中的两列并找出值的变化率
【发布时间】:2019-10-15 16:40:09
【问题描述】:

我正在尝试比较数据框中的两列并找出值的变化率。 我编写了一个 UDF 来实现这一点,但在执行时出错。

下面是dataframe中的数据结构。

+------------+-------------+-----------+------+
| NUM_ID     | TIME        |PREVIOUS_SG1|SG1_V|
+------------+-------------+-----------+------+
|XXXXX01     |1570167499000|  null     |79.0  |
|XXXXX01     |1570167502000|   79.0    |88.0  |
|XXXXX01     |1570167503000|  88.0     |99.0  |
|XXXXX01     |1570179810000|  99.0     |null  |
|XXXXX01     |1570179811000|  null     |100.0 |

以下是此数据框的架构。

scala> castDF.printSchema
root
 |-- NUM_ID: string (nullable = true)
 |-- TIME: long (nullable = true)
 |-- PREVIOUS_SG1: double (nullable = true)
 |-- SG1_V: double (nullable = true)

下面是编写的UDF。

def UDF_D:UserDefinedFunction=udf((PREV: Double,CURR: Double)=>{
  if(PREV != null || PREV !=0){
  val out = ((CURR-PREV)/PREV)*100
  out
  }})

以及调用 UDF 的 scala 代码

val diffDF = castDF.withColumn("SG1_DIFF", (UDF_D(col("PREVIOUS_SG1"),col("SG1_V"))))

执行时出现错误。

scala> val diffDF = castDF.withColumn("SG1_DIFF", (UDF_D(col("PREVIOUS_SG1"),col("SG1_V"))))
java.lang.UnsupportedOperationException: Schema for type AnyVal is not supported

是否需要进行任何强制转换来调用 UDF 或空值是否会导致问题?我希望我传递的是 Double 值,而不是处理任何其他类型。

【问题讨论】:

  • 如果您解释问题中的解决方案/错误,这对我非常有帮助,因为我是 scala 和 DF 技术的新手,在投反对票之前。
  • 请分享预期输出
  • @Antony,你给了我们预期的输出吗?我非常怀疑您的 udf 并非每次都返回一个值,这导致它失败。

标签: scala dataframe apache-spark apache-spark-sql


【解决方案1】:

调用 UDF 不需要进行任何转换,但是 UDF 和列类型应该是同步的。空值也不会导致问题。

问题在于 UDF,UDF 应该总是返回一个值。当输入数据为null或0时,在UDF中添加else条件;

def UDF_D: UserDefinedFunction = udf((PREV: Double, CURR: Double) => {
    if (PREV != null || PREV != 0 || CURR != null || CURR != 0) {
      val out = ((CURR - PREV) / PREV) * 100
      out
    } else 0
})

【讨论】:

    【解决方案2】:

    你不需要 udf 来做这个

    df.select(when(('PREV.isNull || 'CURR === 0),  (('CURR-'PREV)/'PREV)*100).otherwise(0))
    

    作为函数

     def compareCols(PREV: Column, CURR: Column): Column = {
        when((PREV.isNull || CURR === 0),  ((CURR-PREV)/PREV)*100).otherwise(0)
      }
    
     val diffDF = df.withColumn("SG1_DIFF", compareCols('PREV,'CURR))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-03-23
      • 1970-01-01
      • 1970-01-01
      • 2021-09-17
      • 2017-07-04
      • 2020-02-24
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多