【问题标题】:Scala -Create Map from Spark DataFrameScala - 从 Spark DataFrame 创建地图
【发布时间】:2020-08-09 20:11:09
【问题描述】:

我有一个 Spark DataFrame,我想创建 Map 并将值存储为 Map[String, Map[String, String]]。 我不知道该怎么做,任何帮助将不胜感激。

下面是输入输出格式:

输入:

    +-----------------+------------+---+--------------------------------+
    |relation         |obj_instance|obj|map_value                       |
    +-----------------+------------+---+--------------------------------+
    |Start~>HInfo~>Mnt|Mnt         |Mnt|[Model -> 2000, Version -> 1.0] |
    |Start~>HInfo~>Cbl|Cbl-3       |Cbl|[VSData -> XYZVN, Name -> Smart]|
    +-----------------+------------+---+--------------------------------+

输出:

    Map(relation -> Start~>HInfo~>Mnt, obj_instance -> Mnt, obj -> Mnt, Mnt -> Map(Model -> 2000, Version -> 1.0))
    Map(relation -> Start~>HInfo~>Cbl, obj_instance -> Cbl-3, obj -> Cbl, Cbl -> Map(VSData -> XYZVN, Name -> Smart))  

代码,我正在尝试但没有成功:

   var resultMap: Map[Any, Any] = Map()
   groupedDataSet.foreach( r => {
     val key1 = "relation".toString
     val value1 = r(0).toString
     val key2 = "obj_instance".toString
     val value2 = r(1).toString
     val key3 = "obj".toString
     val value3 = r(2).toString
     val key4 = r(2).toString
     val value4 = r(3)

     resultMap += (key1 -> value1, key2 -> value2, key3 -> value3, key4 -> value4)
   })
     resultMap.foreach(println)

请帮忙。

下面是创建Test DataFrame和Map Column的代码

            import org.apache.spark.SparkConf
            import org.apache.spark.sql.{Column, SparkSession}
            import org.apache.spark.sql.functions._

            object DFToMap extends App {

              //Creating SparkSession
              lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
                .setIfMissing("spark.master", "local[*]")
              lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()

              import sparkSession.implicits._

    // Creating raw DataFrame
          val rawTestDF = Seq(("Start~>HInfo~>Cbl", "Cbl-3", "Cbl", "VSData", "XYZVN"), ("Start~>HInfo~>Cbl", "Cbl-3", "Cbl", "Name", "Smart"),
            ("Start~>HInfo~>Mnt", "Mnt", "Mnt", "Model", "2000"), ("Start~>HInfo~>Mnt", "Mnt", "Mnt", "Version", "1.0"))
            .toDF("relation", "obj_instance", "obj", "key", "value")

          rawTestDF.show(false)

    val joinTheMap = udf { json_value: Seq[Map[String, String]] => json_value.flatten.toMap }

          val groupedDataSet = rawTestDF.groupBy("relation", "obj_instance", "obj").agg(collect_list(map(col("key"), col("value"))) as "map_value_temp").withColumn("map_value", joinTheMap(col("map_value_temp")))
            .drop("map_value_temp")

          groupedDataSet.show(false)  //This is the Input DataFrame.


            }

地图的最终输出 Json:

    [{"relation":"Start~>HInfo~>Mnt","obj_instance":"Mnt","obj":"Mnt","Mnt":{"Model":"2000","Version":"1.0"}}
    {"relation":"Start~>HInfo~>Cbl","obj_instance":"Cbl-3","obj:"Cbl","Cbl":{"VSData":"XYZVN","Name":"Smart"}}]

注意:我不想使用任何 Spark groupBy、pivot、agg,因为 Spark 流不支持多重聚合。因此,我想用纯 Scala 代码来获得它。请帮忙。

【问题讨论】:

  • 你可以发布你的数据框架构并创建数据框吗?
  • 嗨 Srinivas,我创建了代码/类来获取此输出,如果您可以将整个数据框转换为地图并最终必须转换为 Json,请提供帮助。
  • 当然,如果你给我最终的 json 输出和数据框的输入,我可以帮助你。
  • 你能解释一下这个 - 印度 -> 地图(城市 -> 德里,大小 -> L)你是如何绘制地图的
  • 也在这里发布你的最终 json 输出?

标签: list scala dataframe apache-spark dictionary


【解决方案1】:

创建自定义 UDF 以解析和生成 JSON 格式的数据。

  import org.json4s.native.JsonMethods._
  import org.json4s._
  import org.json4s.JsonDSL._

  def toJson(relation:String,obj_instance: String,obj: String,map_value: Map[String,String]) = {
    compact(render(
      JObject("relation" -> JString(relation),
        "obj_instance" -> JString(obj_instance),
        "obj" -> JString(obj),
        obj -> map_value)))
  }

  import org.apache.spark.sql.functions._
  val createJson = udf(toJson _)
  val df = Seq(("Start~>HInfo~>Mnt","Mnt","Mnt",Map("Model" -> "2000", "Version" -> "1.0")),("Start~>HInfo~>Cbl","Cbl-3","Cbl",Map("VSData" -> "XYZVN", "Name" -> "Smart"))).toDF("relation","obj_instance","obj","map_value")
  df.select(createJson($"relation",$"obj_instance",$"obj",$"map_value").as("json_map")).show(false)


+-----------------------------------------------------------------------------------------------------------+
|json_map                                                                                                   |
+-----------------------------------------------------------------------------------------------------------+
|{"relation":"Start~>HInfo~>Mnt","obj_instance":"Mnt","obj":"Mnt","Mnt":{"Model":"2000","Version":"1.0"}}   |
|{"relation":"Start~>HInfo~>Cbl","obj_instance":"Cbl-3","obj":"Cbl","Cbl":{"VSData":"XYZVN","Name":"Smart"}}|
+-----------------------------------------------------------------------------------------------------------+

【讨论】:

  • 嗨 Srinivas,感谢您的及时回答,但如果您查看更新后的问题,我的输入数据框有一个 Map 列,因此这可能无济于事,因为它只需要相同的数据类型。
  • 将映射数据类型列转换为字符串并传递给映射火花函数。
  • 我在印度国家有价值观地图的答案中也做了同样的事情。
  • 之前我曾尝试将 Map Column 转换为 String 并获得了预期的输出,但是当我尝试转换为 Json 时,它与下面给出的预期不同,因此为解决方案而苦苦挣扎。 {"relation":"Start~>HInfo~>Mnt","obj_instance":"Mnt","obj":"Mnt","Mnt":"[Model -> 2000, Version -> 1.0]}跨度>
  • 你是如何将 map 转换为 json 的?
猜你喜欢
  • 1970-01-01
  • 2019-07-08
  • 1970-01-01
  • 2016-02-25
  • 1970-01-01
  • 2018-04-17
  • 2021-08-20
  • 2020-02-13
  • 1970-01-01
相关资源
最近更新 更多