【问题标题】:How to coalesce array columns in Spark dataframe如何合并 Spark 数据框中的数组列
【发布时间】:2016-12-29 07:12:06
【问题描述】:

假设我有以下数据框:

id | myStruct
___________________
1  | [val1, val2]
___________________
2  | [val3, val4]
___________________
1  | [val5, val6]

我想将所有共享相同 id 的 myStructs 分组到 myStructs 的数组列中。所以,上面的数据框应该变成

id | myStruct
__________________________________
1  | [[val1, val2], [val5, val6]]
__________________________________
2  | [[val3, val4]]

我知道有一个数组函数,但它只会将每一列转换为大小为 1 的数组。如何合并生成的数组?

我在 Scala shell 中使用 Spark 1.5.2。

鉴于我使用的是 Spark 1.5.2,我无法使用 collect_list 或 collect_set。

【问题讨论】:

    标签: scala apache-spark spark-dataframe


    【解决方案1】:

    如果您使用 Spark 1.5 并且无法升级,最简单的选项是RDD.groupByKey

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    
    val rows = df.rdd
      .map { case Row(id, myStruct) => (id, myStruct) }
      .groupByKey
      .map { case (id, myStructs) => Row(id, myStructs) }
    
    val schema =  StructType(Seq(
      df.schema("id"),
      StructField("myStructs", ArrayType(df.schema("myStruct").dataType))
    ))
    
    sqlContext.createDataFrame(rows, schema)
    

    可以通过像这样转换为“对”来概括:

    import org.apache.spark.sql.functions.struct
    
    df.select(
      struct($"key1", $"key2", ..., $"keyn").alias("id"),
      struct($"val1", $"val2", ..., $"valn").alias("myStruct")
    )
    

    【讨论】:

      【解决方案2】:

      在 Spark 2.0+ 中,您可以使用 collect_list 来完成此操作:

      scala> val df = sc.parallelize(Seq((1, ("v1", "v2")), (2, ("v3", "v4")), (1, ("v5", "v6")))).toDF("id", "myStruct")
      df: org.apache.spark.sql.DataFrame = [id: int, myStruct: struct<_1: string, _2: string>]
      
      scala> df.show
      +---+--------+
      | id|myStruct|
      +---+--------+
      |  1| [v1,v2]|
      |  2| [v3,v4]|
      |  1| [v5,v6]|
      +---+--------+
      
      scala> df.groupBy("id").agg(collect_list($"myStruct")).show
      +---+----------------------+                                                    
      | id|collect_list(myStruct)|
      +---+----------------------+
      |  1|    [[v1,v2], [v5,v6]]|
      |  2|             [[v3,v4]]|
      +---+----------------------+
      

      但是在 Spark 1.5.2 中你需要这样的东西:

      scala> val df2 = df.select($"id", $"myStruct._1".as("p1"), $"myStruct._2".as("p2"))
      df2: org.apache.spark.sql.DataFrame = [id: int, p1: string, p2: string]
      
      scala> df2.show
      +---+---+---+
      | id| p1| p2|
      +---+---+---+
      |  1| v1| v2|
      |  2| v3| v4|
      |  1| v5| v6|
      +---+---+---+
      
      scala> val rdd = df2.rdd.map{case Row(id: Int, p1: String, p2: String) => (id, (p1, p2))}
      rdd: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[47] at map at <console>:32
      
      scala> val finalDF = rdd.groupByKey.map(x => (x._1, x._2.toList)).toDF("id", "structs")
      finalDF: org.apache.spark.sql.DataFrame = [id: int, structs: array<struct<_1:string,_2:string>>]
      
      scala> finalDF.show
      +---+------------------+
      | id|           structs|
      +---+------------------+
      |  1|[[v1,v2], [v5,v6]]|
      |  2|         [[v3,v4]]|
      +---+------------------+
      

      【讨论】:

      • 正确,但我认为collect_list 仅在 Spark 1.6.0 中引入,因此除非升级,否则 OP 无法使用它(Spark 1.5.2)...
      • 是的,很遗憾我无法升级我的 spark,所以我无法使用 collect_list。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-10-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-01-23
      • 2020-01-20
      • 1970-01-01
      相关资源
      最近更新 更多