【问题标题】:What data type should I use for tuple in Spark Dataframe udf?我应该为 Spark Dataframe udf 中的元组使用什么数据类型?
【发布时间】:2021-01-08 13:10:30
【问题描述】:

输入:

val df = Seq((10, (35, 25))).toDF("id", "scorePair")
df.show
+---+---------+
| id|scorePair|
+---+---------+
| 10| [35, 25]|
+---+---------+

预期输出:

+---+-----------+
| id|totalScore |
+---+-----------+
| 10|         60|
+---+-----------+

想做这样的事情,但是不接受Row类型:

// error
val add = udf((row: Row) => {row match {case (a: Int, b: Int) => a + b}})
df.withColumn("totalScore", add(col("scorePair")))

为什么Row类型是不正确的思考

"Dataframe 是 Dataset[Row] 的别名"

?

我应该使用什么类型?我怎样才能实现它?


  • 我强调 Row 类型,因为至少我设法通过以下方式使用 Row(将列的每个单元格视为 Row ) 来实现:
val add = udf((rows: Seq[Row]) => {rows.map {case Row(a: Int, b: Int) => a + b}})
df.groupBy("id").agg(collect_list("scorePair") as "pairSeq").withColumn("totalScore1", add(col("pairSeq"))).select(col("id"), explode(col("totalScore1")) as "totalScore").show
+---+----------+
| id|totalScore|
+---+----------+
| 10|        60|
+---+----------+

但这真的不干净!

【问题讨论】:

  • 我在尝试的尝试中实际上犯了一个错误!如果我从val add = udf((row: Row) => {row match {case (a: Int, b: Int) => a + b}}) 更改为val add = udf((row: Row) => {row match {case Row(a: Int, b: Int) => a + b}}),它应该是正确的。这也回答了关于 Row 类型的问题,所以是的,它是 Row 类型,一切都是一致的。所以我正在考虑结束这个问题。

标签: scala apache-spark apache-spark-sql user-defined-functions


【解决方案1】:

您可以使用row.getAs[Int](0)row.get(0).asInstanceOf[Int]row.getInt(0)从行中获取值

val df = Seq(
  (10, (35, 25))
).toDF("id", "scorePair")


val add = udf((row: Row) => {row.getInt(0) + row.getInt(1)})

df.withColumn("totalScore", add($"scorePair")).show(false)

df.select($"id", $"scorePair._1" + $"scorePair._2" as "totalScore").show(false)

输出:

+---+----------+
|id |totalScore|
+---+----------+
|10 |60        |
+---+----------+

【讨论】:

    【解决方案2】:

    aggregate 函数是对 ArrayType 列中的所有数字求和的最简单方法。 This post 有一个完整的例子。这是sn-p:

    val resDF = df.withColumn(
      "totalScore",
      aggregate(
        col("scorePair"),
        lit(0),
        (col1: Column, col2: Column) => col1 + col2
      )
    )
    

    您希望尽可能避免使用 UDF。此解决方案仅适用于 Spark 3+。

    【讨论】:

    • 但是根据提供的数据,scorePair 是 struct 类型。
    猜你喜欢
    • 1970-01-01
    • 2019-07-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-08-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多