【发布时间】:2020-11-20 20:52:21
【问题描述】:
这是一个示例数据
val df4 = sc.parallelize(List(
("A1",45, "5", 1, 90),
("A2",60, "1", 1, 120),
("A6", 30, "9", 1, 450),
("A7", 89, "7", 1, 333),
("A7", 89, "4", 1, 320),
("A2",60, "5", 1, 22),
("A1",45, "22", 1, 1)
)).toDF("CID","age", "children", "marketplace_id","value")
感谢@Shu 提供这段代码
val df5 = df4.selectExpr("CID","""to_json(named_struct("id", children)) as item""", "value", "marketplace_id")
+---+-----------+-----+--------------+
|CID|item |value|marketplace_id|
+---+-----------+-----+--------------+
|A1 |{"id":"5"} |90 |1 |
|A2 |{"id":"1"} |120 |1 |
|A6 |{"id":"9"} |450 |1 |
|A7 |{"id":"7"} |333 |1 |
|A7 |{"id":"4"} |320 |1 |
|A2 |{"id":"5"} |22 |1 |
|A1 |{"id":"22"}|1 |1 |
+---+-----------+-----+--------------+
当你做df5.dtypes
(CID,StringType), (item,StringType), (value,IntegerType), (marketplace_id,IntegerType)
列项是字符串类型,有没有办法可以是json/object类型(如果有的话)?
编辑 1: 我将在这里描述我想要实现的目标,以上两个步骤保持不变。
val w = Window.partitionBy("CID").orderBy(desc("value"))
val sorted_list = df5.withColumn("item", collect_list("item").over(w)).groupBy("CID").agg(max("item") as "item")
输出:
+---+-------------------------+
|CID|item |
+---+-------------------------+
|A6 |[{"id":"9"}] |
|A2 |[{"id":"1"}, {"id":"5"}] |
|A7 |[{"id":"7"}, {"id":"4"}] |
|A1 |[{"id":"5"}, {"id":"22"}]|
+---+-------------------------+
现在[ ] 中的任何内容都是一个字符串。这导致我们正在使用的工具之一出现问题。
对不起,对不起,我是 scala 的新手,如果这是一个基本问题,请火花。
【问题讨论】:
-
原生spark类型是struct,没有json类型。
-
有什么解决办法吗?我只是不希望它是 StringType。
-
正如@Lamanus 所说,有一个特殊的
struct类型用于表示数据框中的复杂对象。你想用那个 JSON 实现什么?似乎里面的所有数据都已经是记录的一部分,所以我看不到将item作为 JSON 的意义 -
其中一个工具要求列格式为 {"items": "[{"id": "value"},{"id": "value2"},{"id": "value3"}]"}
-
看看XY Problem。请更改问题以说明输入和预期输出的实际问题。
标签: scala apache-spark