【发布时间】:2020-10-05 12:16:24
【问题描述】:
我正在使用 Spark 2.3.2。
在我的 Dataframe 中的一列上,我依次执行了许多 spark.sql.functions。如何将这一系列函数包装到用户定义函数 (UDF) 中以使其可重用?
这是我的示例,重点关注一列“columnName”。首先,我正在创建我的测试数据:
val testSchema = new StructType()
.add("columnName", new StructType()
.add("2020-11", LongType)
.add("2020-12", LongType)
)
val testRow = Seq(Row(Row(1L, 2L)))
val testRDD = spark.sparkContext.parallelize(testRow)
val testDF = spark.createDataFrame(testRDD, testSchema)
testDF.printSchema()
/*
root
|-- columnName: struct (nullable = true)
| |-- 2020-11: long (nullable = true)
| |-- 2020-12: long (nullable = true)
*/
testDF.show(false)
/*
+----------+
|columnName|
+----------+
|[1, 2] |
+----------+
*/
以下是应用 Spark SQL 函数的序列(仅作为示例):
val testResult = testDF
.select(explode(split(regexp_replace(to_json(col("columnName")), "[\"{}]", ""), ",")).as("result"))
我未能创建 UDF“myUDF”,这样我在调用时可以获得相同的结果
val testResultWithUDF = testDF.select(myUDF(col("columnName"))
这就是我“想要”做的事情:
def parseAndExplode(spalte: Column): Column = {
explode(split(regexp_replace(to_json(spalte), "[\"{}]", ""), ",")
}
val myUDF = udf(parseAndExplode _)
testDF.withColumn("udf_result", myUDF(col("columnName"))).show(false)
但它正在抛出异常:
Schema for type org.apache.spark.sql.Column is not supported
java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported
还尝试使用 Row 作为输入参数,但尝试应用内置 SQL 函数再次失败。
【问题讨论】:
标签: scala apache-spark apache-spark-sql