【问题标题】:Convert a column from StringType to Json (object)将列从 StringType 转换为 Json(对象)
【发布时间】:2020-11-20 20:52:21
【问题描述】:

这是一个示例数据

val df4 = sc.parallelize(List(
  ("A1",45, "5", 1, 90),
  ("A2",60, "1", 1, 120),
  ("A6", 30, "9", 1, 450),
  ("A7", 89, "7", 1, 333),
  ("A7", 89, "4", 1, 320),
  ("A2",60, "5", 1, 22),
  ("A1",45, "22", 1, 1)
)).toDF("CID","age", "children", "marketplace_id","value")

感谢@Shu 提供这段代码

val df5 = df4.selectExpr("CID","""to_json(named_struct("id", children)) as item""", "value", "marketplace_id")
+---+-----------+-----+--------------+
|CID|item       |value|marketplace_id|
+---+-----------+-----+--------------+
|A1 |{"id":"5"} |90   |1             |
|A2 |{"id":"1"} |120  |1             |
|A6 |{"id":"9"} |450  |1             |
|A7 |{"id":"7"} |333  |1             |
|A7 |{"id":"4"} |320  |1             |
|A2 |{"id":"5"} |22   |1             |
|A1 |{"id":"22"}|1    |1             |
+---+-----------+-----+--------------+

当你做df5.dtypes

(CID,StringType), (item,StringType), (value,IntegerType), (marketplace_id,IntegerType)

列项是字符串类型,有没有办法可以是json/object类型(如果有的话)?

编辑 1: 我将在这里描述我想要实现的目标,以上两个步骤保持不变。

val w = Window.partitionBy("CID").orderBy(desc("value"))
val sorted_list = df5.withColumn("item", collect_list("item").over(w)).groupBy("CID").agg(max("item") as "item")

输出:

+---+-------------------------+
|CID|item                     |
+---+-------------------------+
|A6 |[{"id":"9"}]             |
|A2 |[{"id":"1"}, {"id":"5"}] |
|A7 |[{"id":"7"}, {"id":"4"}] |
|A1 |[{"id":"5"}, {"id":"22"}]|
+---+-------------------------+

现在[ ] 中的任何内容都是一个字符串。这导致我们正在使用的工具之一出现问题。

对不起,对不起,我是 scala 的新手,如果这是一个基本问题,请火花。

【问题讨论】:

  • 原生spark类型是struct,没有json类型。
  • 有什么解决办法吗?我只是不希望它是 StringType。
  • 正如@Lamanus 所说,有一个特殊的struct 类型用于表示数据框中的复杂对象。你想用那个 JSON 实现什么?似乎里面的所有数据都已经是记录的一部分,所以我看不到将item 作为 JSON 的意义
  • 其中一个工具要求列格式为 {"items": "[{"id": "value"},{"id": "value2"},{"id": "value3"}]"}
  • 看看XY Problem。请更改问题以说明输入和预期输出的实际问题。

标签: scala apache-spark


【解决方案1】:

使用struct类型存储json数据,检查下面的代码。

scala> dfa
.withColumn("item_without_json",struct($"cid".as("id")))
.withColumn("item_as_json",to_json($"item_without_json"))
.show(false)

+---+-----------+-----+--------------+-----------------+------------+
|CID|item       |value|marketplace_id|item_without_json|item_as_json|
+---+-----------+-----+--------------+-----------------+------------+
|A1 |{"id":"A1"}|90   |1             |[A1]             |{"id":"A1"} |
|A2 |{"id":"A2"}|120  |1             |[A2]             |{"id":"A2"} |
|A6 |{"id":"A6"}|450  |1             |[A6]             |{"id":"A6"} |
|A7 |{"id":"A7"}|333  |1             |[A7]             |{"id":"A7"} |
|A7 |{"id":"A7"}|320  |1             |[A7]             |{"id":"A7"} |
|A2 |{"id":"A2"}|22   |1             |[A2]             |{"id":"A2"} |
|A1 |{"id":"A1"}|1    |1             |[A1]             |{"id":"A1"} |
+---+-----------+-----+--------------+-----------------+------------+

【讨论】:

  • item_as_json 仍然是 StringType。
  • spark 没有 json 数据类型,而是可以存储与结构类型相同的数据。检查我添加了两列,一列是 item_without_json,另一列转换为 item_without_json 列到 json item_as_json
  • 感谢您抽出时间斯里尼瓦斯。我会找到解决方法或改变我的方法,或者如你所说,如果我可以使用 struct 来实现,我会尝试。
  • 如果你想将json作为对象,你需要将json转换为spark原生数据类型..即structmap类型。
【解决方案2】:

根据您将数据集转换为您将使用的 json 所做的评论:

df4
  .select(collect_list(struct($"CID".as("id"))).as("items"))
  .write()
  .json(path)

输出将如下所示:

{"items":[{"id":"A1"},{"id":"A2"},{"id":"A6"},{"id":"A7"}, ...]}

如果您需要将内存中的内容传递给函数,请使用 toJSON 而不是 toJSON

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-02-20
    • 2019-12-27
    • 2021-02-10
    • 2016-10-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多