【问题标题】:How to map struct in DataFrame to case class?如何将 DataFrame 中的结构映射到案例类?
【发布时间】:2017-09-03 20:35:27
【问题描述】:

在我的应用程序的某个时刻,我有一个 DataFrame,其中包含一个从案例类创建的 Struct 字段。现在我想将其转换/映射回案例类类型:

import spark.implicits._
case class Location(lat: Double, lon: Double)

scala> Seq((10, Location(35, 25)), (20, Location(45, 35))).toDF
res25: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<lat: double, lon: double>]

scala> res25.printSchema
root
 |-- _1: integer (nullable = false)
 |-- _2: struct (nullable = true)
 |    |-- lat: double (nullable = false)
 |    |-- lon: double (nullable = false)

基本的:

res25.map(r => {
   Location(r.getStruct(1).getDouble(0), r.getStruct(1).getDouble(1))
}).show(1)

看起来很脏 有没有更简单的方法?

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql apache-spark-2.0


    【解决方案1】:

    在Spark 1.6+中,如果要保留保留的类型信息,则使用Dataset(DS),而不是DataFrame(DF)。

    import spark.implicits._
    case class Location(lat: Double, lon: Double)
    
    scala> Seq((10, Location(35, 25)), (20, Location(45, 35))).toDS
    res25: org.apache.spark.sql.Dataset[(Int, Location)] = [_1: int, _2: struct<lat: double, lon: double>]
    
    scala> res25.printSchema
    root
     |-- _1: integer (nullable = false)
     |-- _2: struct (nullable = true)
     |    |-- lat: double (nullable = false)
     |    |-- lon: double (nullable = false)
    

    它会给你Dataset[(Int, Location)]。现在,如果你想再次回到它的案例类起源,那么只需这样做:

    scala> res25.map(r => r._2).show(1)
    +----+----+
    | lat| lon|
    +----+----+
    |35.0|25.0|
    +----+----+
    

    但是,如果你想坚持使用 DataFrame API,由于它是动态类型的,那么你必须这样编码:

    scala> res25.select("_2.*").map(r => Location(r.getDouble(0), r.getDouble(1))).show(1)
    +----+----+
    | lat| lon|
    +----+----+
    |35.0|25.0|
    +----+----+
    

    【讨论】:

    • 其实我也需要访问其他行属性,所以需要直接映射,不需要选择步骤。如何做到这一点?
    • 目前,除了先使用select,然后再使用map,别无他法。或者,如果可能,您可以使用Dataset
    【解决方案2】:

    您还可以使用Row 中的提取器模式,这会给您类似的结果,使用更惯用的scala:

    scala> res25.map { row =>
      (row: @unchecked) match {
        case Row(a: Int, Row(b: Double, c: Double)) => (a, Location(b, c))
      }
    }
    res26: org.apache.spark.sql.Dataset[(Int, Location)] = [_1: int, _2: struct<lat: double, lon: double>]
    scala> res26.collect()
    res27: Array[(Int, Location)] = Array((10,Location(35.0,25.0)), (20,Location(45.0,35.0)))
    

    【讨论】:

      【解决方案3】:

      我认为其他答案很到位,但也许他们可能需要其他措辞。

      简而言之,不能在 DataFrames 中使用案例类,因为它们不区分案例类并使用 RowEncoder 将内部 SQL 类型映射到 Row

      正如其他答案所说,您必须使用as 运算符将基于RowDataFrame 转换为Dataset

      val df = Seq((10, Location(35, 25)), (20, Location(45, 35))).toDF
      scala> val ds = df.as[(Int, Location)]
      ds: org.apache.spark.sql.Dataset[(Int, Location)] = [_1: int, _2: struct<lat: double, lon: double>]
      
      scala> ds.show
      +---+-----------+
      | _1|         _2|
      +---+-----------+
      | 10|[35.0,25.0]|
      | 20|[45.0,35.0]|
      +---+-----------+
      
      scala> ds.printSchema
      root
       |-- _1: integer (nullable = false)
       |-- _2: struct (nullable = true)
       |    |-- lat: double (nullable = false)
       |    |-- lon: double (nullable = false)
      
      scala> ds.map[TAB pressed twice]
      
      def map[U](func: org.apache.spark.api.java.function.MapFunction[(Int, Location),U],encoder: org.apache.spark.sql.Encoder[U]): org.apache.spark.sql.Dataset[U]
      def map[U](func: ((Int, Location)) => U)(implicit evidence$6: org.apache.spark.sql.Encoder[U]): org.apache.spark.sql.Dataset[U]
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-10-12
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多