【问题标题】:Spark DataFrame columns transform to Map type and List of Map Type [duplicate]Spark DataFrame列转换为地图类型和地图类型列表[重复]
【发布时间】:2017-05-28 09:02:09
【问题描述】:

我有如下数据框,如果有人可以帮助我获得以下不同格式的输出,我将不胜感激。

输入:

|customerId|transHeader|transLine|

|1001      |1001aa     |1001aa1  |

|1001      |1001aa     |1001aa2  |

|1001      |1001aa     |1001aa3  |

|1001      |1001aa     |1001aa4  |

|1002      |1002bb     |1002bb1  |

|1002      |1002bb     |1002bb2  |

|1002      |1002bb     |1002bb3  |

|1002      |1002bb     |1002bb4  |

|1003      |1003cc     |1003cc1  |

|1003      |1003cc     |1003cc2  |

|1003      |1003cc     |1003cc3  |

+----------+-----------+---------+

预期输出集 1:

customerId  headerLineMapGroup 

1001              Map(1001aa -> (1001aa1, 1001aa2, 1001aa3, 1001aa4))

1002              Map(1002bb -> (1002bb1, 1002bb2, 1002bb3, 1002bb4))

1003              Map(1003cc -> (1003cc1, 1003cc2, 1003cc3))         

预期输出集 2:

customerId  headerLineListOfMapGroup 

1001        List[   Map(1001aa -> 1001aa1), Map(1001aa ->1001aa2), Map(1001aa ->1001aa3), Map(1001aa ->1001aa4) ]

1002        List[   Map(1002bb -> 1002bb1), Map(1002bb -> 1002bb2), Map(1002bb -> 1002bb3), Map(1002bb -> 1002bb4)]

1003        List[   Map(1003cc -> 1003cc1), Map(1003cc ->1003cc2), Map(1003cc ->1003cc3) ]     

【问题讨论】:

  • 能否请您添加文本数据并删除图像。以便它可以被搜索和复制。

标签: scala apache-spark-sql spark-dataframe rdd


【解决方案1】:

这里是使用 udf 的解决方案。

    val spark = SparkSession
    .builder()
    .master("local")
    .appName("ParquetAppendMode")
    .getOrCreate()

    import spark.implicits._

    val data = spark.sparkContext.parallelize(Seq(
      (1001, "1001aa","1001aa1"),
      (1001, "1001aa","1001aa2"),
      (1001, "1001aa","1001aa3")
  )).toDF("customerId", "transHeader", "transLine")

  val toMap = udf((header: String, line: Seq[String]) => {
    Map(header -> line)
  })
  val toMapList = udf((header: String, line: Seq[String]) => {
    line.map(l => Map(header -> l)).toList
  })

  val grouped = data.groupBy("customerId", "transHeader").agg(collect_list("transLine").alias("transLine"))

  grouped.withColumn("headerLineMapGroup", toMap($"transHeader", $"transLine"))
      .drop("transHeader", "transLine")
      .show(false)

  grouped.withColumn("headerLineMapGroupList", toMapList($"transHeader", $"transLine"))
    .drop("transHeader", "transLine")
    .show(false)

希望这会有所帮助!

【讨论】:

  • HI Shankar,非常感谢。是否有可能只有 RDD 解决方案?像这样。我能够获得 headerLineMapGroup 但不能获得 headerLineListOfMapGroup。 val SparkEvenOutput= my_dataframe_data_Table.rdd.map ( row=>{ ( row.getAs[Int](0), ( row.getAs[String](1), row.getAs[String](2) ) }).groupByKey ().map(value=>{ val customerId= value._1 val headerLineMapGroup = value._2.groupBy(._1).map { case (k,v) => (k,v.map( i>._2))} 行(customerId,headerLineMapGroup) })
  • @Shankar,我又发布了一个问题,请您帮忙解决一下。 stackoverflow.com/questions/44224639/…
  • 如果这些有帮助,请点赞并标记为答案,好的,我一定会再看一个问题。
  • 这是上述代码的更正代码。 val SparkEvenOutput= data.rdd.map (row=>{( row.getAs[String](0), ( row.getAs[String](1), row.getAs[String](2) ) )}) .groupByKey ().map(value=>{ val customerId= value._1 val headerLineMapGroup = value._2.groupBy(._1).map { case (k,v) => (k,v.map( i>._2))} 行(customerId,headerLineMapGroup) })
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-09-19
  • 2018-03-15
  • 1970-01-01
  • 2016-02-26
  • 2019-03-23
  • 2018-02-20
  • 2020-11-20
相关资源
最近更新 更多