【问题标题】:Spark Scala UDF NullPointer ExceptionSpark Scala UDF NullPointer 异常
【发布时间】:2021-12-07 19:27:35
【问题描述】:

我正在尝试将包含以下格式数据的列拆分并转换为 udf 内的映射。

UDF:

def convertToMapUDF = udf((c: String) => {
  val arr = c.split(",")
  val l = arr.toList
  
  val regexPattern = ".*(=).*".r
  
  println(s"column value: $c")
  
  s"$c" match {
    case regexPattern(a) => Some(l.map(x => x.split("=")).map(a => a(0).toString -> a(1).toString).toMap)
    case "null" => Some(Map[String, String]())
  }
})

val splitColList = List("r_split")
val d = ft.select(splitColList.map(c => convertToMapUDF(col(c))): _*)

r_split 列包含类似的数据

null
null
As=true, eMsion_New:E=true, HR:E=true, Don:E=true, Hrs=true, PAD:E=true, mog:E=true, N4k:E=true, WY:E=true, AT:E=true, Dt_RFC:E=true, DASH_ALL:E=true, TE:E=true, C_14:E=true, We:E=true, PG:E=true, ZR:E=true, MP:E=true, m2M:E=true, HC:E=true, Nos:E=true,
null

例外:

    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3712.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3712.0 (TID 37567, 10.73.35.140, executor 48): org.apache.spark.SparkException: Failed to execute user defined function($read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$7a8ac347ba800d5b55403548fd65e2$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$10845/2054440395: (string) => map<string,string>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
    at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at line6ec706fd41324bde944dab3ff50b81c6224.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$7a8ac347ba800d5b55403548fd65e2$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$convertToMapUDF$1(command-1649645762990220:2)
    ... 14 more

我尝试使用 Option 和 Some 但问题没有得到解决。 在上述字符串上作为独立函数执行时的 udf 工作正常。

【问题讨论】:

  • “null”是字符串吗?还是应该创建case nullcase _
  • null 作为字符串传入
  • 当我使用 r_split 示例创建示例 df 时,UDF 工作正常。
  • 但如果我在数据框中包含实际的空值,则不起作用!

标签: scala apache-spark nullpointerexception user-defined-functions


【解决方案1】:

空值来自您的数据集。您可以使用 cala.util.Try 使您的 udfs null 安全(如果您不想丢失记录)。只需将convertToMapUDF 包装到 Try 构造中即可。它将确保捕获任何非致命异常并返回一个失败对象。

def convertToMapUDF: UserDefinedFunction = udf((c: String) => {
    Try {
        val arr = c.split(",")
        val l = arr.toList

        val regexPattern = ".*(=).*".r

        println(s"column value: $c")

        s"$c" match {
            case regexPattern(a) => l.map(x => x.split("=")).map(a => a(0) -> a(1)).toMap
            case _ => Map[String, String]()
        }
    }.toOption
})

【讨论】:

  • 这种方法有一个主要缺点 - 除了忽略空值的预期操作之外,它还会抑制有关代码逻辑问题的任何警报。
  • 通过对UDF进行单元测试可以轻松解决。
  • 单元测试总是好的,在这种情况下它可能就足够了,因为逻辑看起来很简单,但是当处理空值和让所有其他例外都经过并提醒用户
【解决方案2】:

您根本不需要在这里使用 UDF:Spark 具有您需要的所有功能。考虑以下几点:

ft
  .withColumn("id", monotonically_increasing_id())
  .select(col("id"), explode(split(col("r_split"), ",")) as "r")
  .select(col("id"), split(col("r"), "=") as "s")
  .select(col("id"), map_from_arrays(array(col("s")(0)) as "k", array(col("s")(1)) as "v") as "map")
  .groupBy("id").agg(collect_list("map") as "maps")
  .select(
   aggregate(col("maps"), typedLit(Map[String, String]()), (acc, nxt) => map_concat(acc, nxt)) as "r_split"
  )

首先我们添加一个 ID,以便我们可以在最后将所有内容重新组合在一起,然后它会获取您的输入字符串 r_split,并在“,”上将其拆分。然后它“分解”结果数组,以便每个术语都有自己的Row。然后我们拆分“=”符号上的行以创建另一个数组。接下来,我们通过选择数组的第一个元素作为键,将第二个元素作为值来创建一个映射。这为我们提供了每个条目的映射,因此我们通过 ID(因此是第一步)将它们收集到一个列表中。最后,我们将该列表简化为单个地图。

我觉得这里的步骤太多了,使用regexp_extract 可能我们可以更有效地完成各种拆分/分解阶段,但这证明了这个想法,而无需使用 UDF。

【讨论】:

    【解决方案3】:

    最后一行是不必要的混淆,与您的问题无关。你的意思是

    val d = ft.select(convertToMapUDF(col("r_split")))
    

    对吗?

    无论如何,对于缺失值 (null),您期望什么行为?我猜你想跳过它们,那就是保持null。此行为在 spark 中用于处理非可空类型(Float/Int/etc)的 udfs。对于可以为空的类型,包括String,需要自己实现。 在 udf 中首先检查 c == null 并在这种情况下返回 null。抱歉,我看到您的函数返回 Option[Map[_,_]]。在这种情况下返回None

    我认为(目前不确定)您也可以将您的参数更改为Option[String],但您仍然需要自己处理None 案例,也许还有null。我对此有点模糊。

    【讨论】:

      猜你喜欢
      • 2020-11-13
      • 1970-01-01
      • 2018-11-22
      • 1970-01-01
      • 2020-08-13
      • 2020-11-14
      • 2012-09-13
      • 2016-12-02
      • 1970-01-01
      相关资源
      最近更新 更多