【问题标题】:Spark UDF for StructType / RowStructType / Row 的 Spark UDF
【发布时间】:2017-08-13 09:29:27
【问题描述】:

我在 spark Dataframe 中有一个“StructType”列,其中包含一个数组和一个字符串作为子字段。我想修改数组并返回相同类型的新列。我可以用 UDF 处理它吗?或者有什么替代方案?

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val sub_schema = StructType(StructField("col1",ArrayType(IntegerType,false),true) :: StructField("col2",StringType,true)::Nil)
val schema = StructType(StructField("subtable", sub_schema,true) :: Nil)
val data = Seq(Row(Row(Array(1,2),"eb")),  Row(Row(Array(3,2,1), "dsf")) )
val rd = sc.parallelize(data)
val df = spark.createDataFrame(rd, schema)
df.printSchema

root
 |-- subtable: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)

看来我需要一个 Row 类型的 UDF,类似于

val u =  udf((x:Row) => x)
       >> Schema for type org.apache.spark.sql.Row is not supported

这是有道理的,因为 Spark 不知道返回类型的架构。 不幸的是,udf.register 也失败了:

spark.udf.register("foo", (x:Row)=> Row, sub_schema)
     <console>:30: error: overloaded method value register with alternatives: ...

【问题讨论】:

  • 为什么 Row 类型的 UDF 可以工作? Spark的UDF类型是如何推断的?

标签: scala apache-spark udf


【解决方案1】:

原来你可以将结果模式作为第二个 UDF 参数传递:

val u =  udf((x:Row) => x, sub_schema)

【讨论】:

  • 如果要使用x的架构怎么办?假设 x 是 GenericRowWithSchema
【解决方案2】:

你在正确的轨道上。在这种情况下,UDF 将使您的生活变得轻松。正如您已经遇到的那样,UDF 不能返回 spark 不知道的类型。所以基本上你需要返回一些火花可以很容易序列化的东西。它可能是 case class 或者你可以返回一个像 (Seq[Int], String) 这样的元组。所以这是你的代码的修改版本:

def main(args: Array[String]): Unit = {
  import org.apache.spark.sql.Row
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.types._
  val sub_schema = StructType(StructField("col1", ArrayType(IntegerType, false), true) :: StructField("col2", StringType, true) :: Nil)
  val schema = StructType(StructField("subtable", sub_schema, true) :: Nil)
  val data = Seq(Row(Row(Array(1, 2), "eb")), Row(Row(Array(3, 2, 1), "dsf")))
  val rd = spark.sparkContext.parallelize(data)
  val df = spark.createDataFrame(rd, schema)

  df.printSchema()
  df.show(false)

  val mapArray = (subRows: Row) => {
    // I prefer reading values from row by specifying column names, you may use index also
    val col1 = subRows.getAs[Seq[Int]]("col1")
    val mappedCol1 = col1.map(x => x * x) // Use map based on your requirements
    (mappedCol1, subRows.getAs[String]("col2")) // now mapping is done for col2
  }
  val mapUdf = udf(mapArray)

  val newDf = df.withColumn("col1_mapped", mapUdf(df("subtable")))
  newDf.show(false)
  newDf.printSchema()
}

请查看这些链接,这些链接可能会让您更深入地了解。

  1. 关于使用复杂模式的最全面答案:https://stackoverflow.com/a/33850490/4046067
  2. Spark 支持的数据类型:https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

【讨论】:

    【解决方案3】:

    是的,您可以使用 UDF 执行此操作。为简单起见,我以案例类为例,并通过向每个值添加 2 来更改数组:

    case class Root(subtable: Subtable)
    case class Subtable(col1: Seq[Int], col2: String)
    
    val df = spark.createDataFrame(Seq(
      Root(Subtable(Seq(1, 2, 3), "toto")),
      Root(Subtable(Seq(10, 20, 30), "tata"))
    ))
    
    val myUdf = udf((subtable: Row) =>
      Subtable(subtable.getSeq[Int](0).map(_ + 2), subtable.getString(1))
    )
    val result = df.withColumn("subtable_new", myUdf(df("subtable")))
    result.printSchema()
    result.show(false)
    

    将打印:

    root
     |-- subtable: struct (nullable = true)
     |    |-- col1: array (nullable = true)
     |    |    |-- element: integer (containsNull = false)
     |    |-- col2: string (nullable = true)
     |-- subtable_new: struct (nullable = true)
     |    |-- col1: array (nullable = true)
     |    |    |-- element: integer (containsNull = false)
     |    |-- col2: string (nullable = true)
    
    +-------------------------------+-------------------------------+
    |subtable                       |subtable_new                   |
    +-------------------------------+-------------------------------+
    |[WrappedArray(1, 2, 3),toto]   |[WrappedArray(3, 4, 5),toto]   |
    |[WrappedArray(10, 20, 30),tata]|[WrappedArray(12, 22, 32),tata]|
    +-------------------------------+-------------------------------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-05-21
      • 2021-03-08
      • 1970-01-01
      • 2017-12-13
      • 2020-10-31
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多