【问题标题】:SparkSQL: How to deal with null values in user defined function?SparkSQL:如何处理用户定义函数中的空值?
【发布时间】:2015-11-28 04:45:44
【问题描述】:

给定表 1,其中有一列“x”类型为字符串。 我想创建带有列“y”的表 2,该列是“x”中给出的日期字符串的整数表示。

基本是在“y”列中保留null 值。

表 1(数据框 df1):

+----------+
|         x|
+----------+
|2015-09-12|
|2015-09-13|
|      null|
|      null|
+----------+
root
 |-- x: string (nullable = true)

表 2(数据框 df2):

+----------+--------+                                                                  
|         x|       y|
+----------+--------+
|      null|    null|
|      null|    null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
 |-- x: string (nullable = true)
 |-- y: integer (nullable = true)

而将“x”列的值转换为“y”列的值的用户定义函数(udf)是:

val extractDateAsInt = udf[Int, String] (
  (d:String) => d.substring(0, 10)
      .filterNot( "-".toSet)
      .toInt )

并且有效,处理空值是不可能的。

尽管如此,我可以做类似的事情

val extractDateAsIntWithNull = udf[Int, String] (
  (d:String) => 
    if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 
    else 1 )

我没有找到通过 udfs“产生”null 值的方法(当然,Ints 不能是 null)。

我目前创建 df2(表 2)的解决方案如下:

// holds data of table 1  
val df1 = ... 

// filter entries from df1, that are not null
val dfNotNulls = df1.filter(df1("x")
  .isNotNull)
  .withColumn("y", extractDateAsInt(df1("x")))
  .withColumnRenamed("x", "right_x")

// create df2 via a left join on df1 and dfNotNull having 
val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x")

问题

  • 当前的解决方案看起来很麻烦(而且可能效率不高)。有没有更好的方法?
  • @Spark-developers:是否有类型NullableInt 计划/可用,以便可以使用以下 udf(参见代码摘录)?

代码摘录

val extractDateAsNullableInt = udf[NullableInt, String] (
  (d:String) => 
    if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 
    else null )

【问题讨论】:

标签: scala apache-spark apache-spark-sql user-defined-functions nullable


【解决方案1】:

这就是Option派上用场的地方:

val extractDateAsOptionInt = udf((d: String) => d match {
  case null => None
  case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt)
})

或在一般情况下使其更安全:

import scala.util.Try

val extractDateAsOptionInt = udf((d: String) => Try(
  d.substring(0, 10).filterNot("-".toSet).toInt
).toOption)

所有功劳归于Dmitriy Selivanov,他指出此解决方案是(缺失?)编辑here

替代方法是在 UDF 之外处理 null

import org.apache.spark.sql.functions.{lit, when}
import org.apache.spark.sql.types.IntegerType

val extractDateAsInt = udf(
   (d: String) => d.substring(0, 10).filterNot("-".toSet).toInt
)

df.withColumn("y",
  when($"x".isNull, lit(null))
    .otherwise(extractDateAsInt($"x"))
    .cast(IntegerType)
)

【讨论】:

  • 嗨 zero323,听起来很棒。会尝试一下,一旦成功,就会奖励你!顺便说一句,感谢您的快速响应!!!
  • 不要使用scala.util.Try的解决方案。它将捕获内部的任何错误。这不是好的编码风格。
  • @NiclasvonCaprivi 除了一些定义明确的异常(这些异常通常发生在执行计划解析级别)外,这就是 SQL 函数的行为方式——在意外情况下,该值是未定义的。
【解决方案2】:

Scala 实际上有一个不错的工厂函数 Option(),它可以使这更加简洁:

val extractDateAsOptionInt = udf((d: String) => 
  Option(d).map(_.substring(0, 10).filterNot("-".toSet).toInt))

在内部,Option 对象的 apply 方法只是为您进行 null 检查:

def apply[A](x: A): Option[A] = if (x == null) None else Some(x)

【讨论】:

    【解决方案3】:

    补充代码

    根据@zero323 的nice 回答,我创建了以下代码,以使用户定义的函数可用以处理所描述的空值。希望,对其他人有帮助!

    /**
     * Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that
     * handle `null` values.
     */
    object NullableFunctions {
    
      import org.apache.spark.sql.functions._
      import scala.reflect.runtime.universe.{TypeTag}
      import org.apache.spark.sql.UserDefinedFunction
    
      /**
       * Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
       *   * if fnc input is null, None is returned. This will create a null value in the output Spark column.
       *   * if A1 is non null, Some( f(input) will be returned, thus creating f(input) as value in the output column.
       * @param f function from A1 => RT
       * @tparam RT return type
       * @tparam A1 input parameter type
       * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
       */
      def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
        udf[Option[RT],A1]( (i: A1) => i match {
          case null => None
          case s => Some(f(i))
        })
      }
    
      /**
       * Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
       *   * if on of the function input parameters is null, None is returned.
       *     This will create a null value in the output Spark column.
       *   * if both input parameters are non null, Some( f(input) will be returned, thus creating f(input1, input2)
       *     as value in the output column.
       * @param f function from A1 => RT
       * @tparam RT return type
       * @tparam A1 input parameter type
       * @tparam A2 input parameter type
       * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
       */
      def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {
        udf[Option[RT], A1, A2]( (i1: A1, i2: A2) =>  (i1, i2) match {
          case (null, _) => None
          case (_, null) => None
          case (s1, s2) => Some((f(s1,s2)))
        } )
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-12-12
      • 1970-01-01
      • 2017-10-29
      • 2021-10-14
      • 2010-11-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多