【发布时间】:2021-12-07 19:27:35
【问题描述】:
我正在尝试将包含以下格式数据的列拆分并转换为 udf 内的映射。
UDF:
def convertToMapUDF = udf((c: String) => {
val arr = c.split(",")
val l = arr.toList
val regexPattern = ".*(=).*".r
println(s"column value: $c")
s"$c" match {
case regexPattern(a) => Some(l.map(x => x.split("=")).map(a => a(0).toString -> a(1).toString).toMap)
case "null" => Some(Map[String, String]())
}
})
val splitColList = List("r_split")
val d = ft.select(splitColList.map(c => convertToMapUDF(col(c))): _*)
r_split 列包含类似的数据
null
null
As=true, eMsion_New:E=true, HR:E=true, Don:E=true, Hrs=true, PAD:E=true, mog:E=true, N4k:E=true, WY:E=true, AT:E=true, Dt_RFC:E=true, DASH_ALL:E=true, TE:E=true, C_14:E=true, We:E=true, PG:E=true, ZR:E=true, MP:E=true, m2M:E=true, HC:E=true, Nos:E=true,
null
例外:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3712.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3712.0 (TID 37567, 10.73.35.140, executor 48): org.apache.spark.SparkException: Failed to execute user defined function($read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$7a8ac347ba800d5b55403548fd65e2$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$10845/2054440395: (string) => map<string,string>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at line6ec706fd41324bde944dab3ff50b81c6224.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$7a8ac347ba800d5b55403548fd65e2$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$convertToMapUDF$1(command-1649645762990220:2)
... 14 more
我尝试使用 Option 和 Some 但问题没有得到解决。 在上述字符串上作为独立函数执行时的 udf 工作正常。
【问题讨论】:
-
“null”是字符串吗?还是应该创建
case null或case _? -
null 作为字符串传入
-
当我使用 r_split 示例创建示例 df 时,UDF 工作正常。
-
但如果我在数据框中包含实际的空值,则不起作用!
标签: scala apache-spark nullpointerexception user-defined-functions