【发布时间】:2019-02-13 12:39:21
【问题描述】:
当我创建如上所示的 UDF 函数时,我得到任务序列化错误。仅当我使用spark-submit 在集群部署模式下运行代码时才会出现此错误。但是,它在 spark-shell 中运行良好。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
def mfnURL(arr: WrappedArray[String]): String = {
val filterArr = arr.filterNot(_ == null)
if (filterArr.length == 0)
return null
else {
filterArr.groupBy(identity).maxBy(_._2.size)._1
}
}
val mfnURLUDF = udf(mfnURL _)
def windowSpec = Window.partitionBy("nodeId", "url", "typology")
val result = df.withColumn("count", count("url").over(windowSpec))
.orderBy($"count".desc)
.groupBy("nodeId","typology")
.agg(
first("url"),
mfnURLUDF(collect_list("source_url")),
min("minTimestamp"),
max("maxTimestamp")
)
我尝试添加spark.udf.register("mfnURLUDF",mfnURLUDF),但没有解决问题。
【问题讨论】:
-
优秀的帖子。这有帮助吗? placeiq.com/2017/11/…
-
您遇到了什么异常?您能否提供完整的堆栈跟踪? Spark 在跟踪中提供了序列化问题的原因
-
请不要在 Scala 中返回
null。拜托拜托。 -
@erip:正确的方法是什么?
-
你可以使用
Option[T]。
标签: scala apache-spark apache-spark-sql user-defined-functions