【发布时间】:2020-08-09 20:11:09
【问题描述】:
我有一个 Spark DataFrame,我想创建 Map 并将值存储为 Map[String, Map[String, String]]。 我不知道该怎么做,任何帮助将不胜感激。
下面是输入输出格式:
输入:
+-----------------+------------+---+--------------------------------+
|relation |obj_instance|obj|map_value |
+-----------------+------------+---+--------------------------------+
|Start~>HInfo~>Mnt|Mnt |Mnt|[Model -> 2000, Version -> 1.0] |
|Start~>HInfo~>Cbl|Cbl-3 |Cbl|[VSData -> XYZVN, Name -> Smart]|
+-----------------+------------+---+--------------------------------+
输出:
Map(relation -> Start~>HInfo~>Mnt, obj_instance -> Mnt, obj -> Mnt, Mnt -> Map(Model -> 2000, Version -> 1.0))
Map(relation -> Start~>HInfo~>Cbl, obj_instance -> Cbl-3, obj -> Cbl, Cbl -> Map(VSData -> XYZVN, Name -> Smart))
代码,我正在尝试但没有成功:
var resultMap: Map[Any, Any] = Map()
groupedDataSet.foreach( r => {
val key1 = "relation".toString
val value1 = r(0).toString
val key2 = "obj_instance".toString
val value2 = r(1).toString
val key3 = "obj".toString
val value3 = r(2).toString
val key4 = r(2).toString
val value4 = r(3)
resultMap += (key1 -> value1, key2 -> value2, key3 -> value3, key4 -> value4)
})
resultMap.foreach(println)
请帮忙。
下面是创建Test DataFrame和Map Column的代码
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
object DFToMap extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
// Creating raw DataFrame
val rawTestDF = Seq(("Start~>HInfo~>Cbl", "Cbl-3", "Cbl", "VSData", "XYZVN"), ("Start~>HInfo~>Cbl", "Cbl-3", "Cbl", "Name", "Smart"),
("Start~>HInfo~>Mnt", "Mnt", "Mnt", "Model", "2000"), ("Start~>HInfo~>Mnt", "Mnt", "Mnt", "Version", "1.0"))
.toDF("relation", "obj_instance", "obj", "key", "value")
rawTestDF.show(false)
val joinTheMap = udf { json_value: Seq[Map[String, String]] => json_value.flatten.toMap }
val groupedDataSet = rawTestDF.groupBy("relation", "obj_instance", "obj").agg(collect_list(map(col("key"), col("value"))) as "map_value_temp").withColumn("map_value", joinTheMap(col("map_value_temp")))
.drop("map_value_temp")
groupedDataSet.show(false) //This is the Input DataFrame.
}
地图的最终输出 Json:
[{"relation":"Start~>HInfo~>Mnt","obj_instance":"Mnt","obj":"Mnt","Mnt":{"Model":"2000","Version":"1.0"}}
{"relation":"Start~>HInfo~>Cbl","obj_instance":"Cbl-3","obj:"Cbl","Cbl":{"VSData":"XYZVN","Name":"Smart"}}]
注意:我不想使用任何 Spark groupBy、pivot、agg,因为 Spark 流不支持多重聚合。因此,我想用纯 Scala 代码来获得它。请帮忙。
【问题讨论】:
-
你可以发布你的数据框架构并创建数据框吗?
-
嗨 Srinivas,我创建了代码/类来获取此输出,如果您可以将整个数据框转换为地图并最终必须转换为 Json,请提供帮助。
-
当然,如果你给我最终的 json 输出和数据框的输入,我可以帮助你。
-
你能解释一下这个 - 印度 -> 地图(城市 -> 德里,大小 -> L)你是如何绘制地图的
-
也在这里发布你的最终 json 输出?
标签: list scala dataframe apache-spark dictionary