【发布时间】:2021-07-19 23:31:00
【问题描述】:
我正在使用 scala 2.12 spark 3.0.0。我必须创建一个单个 json 字符串,其中每个属性来自不同的表,并将生成的 json 保存在另一个数据框中,即(在此示例中仅使用 1 行以保持简单)
tableA
| id | action | date |
|---|---|---|
| u1 | insert | 20210428 |
表B
| id | name | date |
|---|---|---|
| u1 | some name | 20210428 |
我需要返回以下内容:
{
"A": [
{
"id":"u1",
"action": "insert",
"date": "20210428"
}
],
"B": [
{
"id":"u1",
"name": "some name"
"date": "20210428",
}
]
}
我尝试了很多方法,但最接近的方法是为每个表执行以下操作:
val tableADF = spark.read.format("delta").load(path +"/tableA")
val tableADF = spark.read.format("delta").load(path +"/tableB")
为每个表创建将所有值转换为 json 的数据框
val tableAJsonDF = tableADF.groupBy("date").agg(collect_list(struct($"id",$"action")).alias("attributesA"))
val tableBJsonDF = tableBDF.groupBy("date").agg(collect_list(struct($"id",$"name")).alias("attributesB"))
| date | attributesA |
|---|---|
| 20210428 | [{"id":"u1", "action": "insert"}] |
| date | attributesB |
|---|---|
| 20210428 | [{"id":"u1", "name": "some name"}] |
现在将两个表中的 json 合并为一个 json,以添加到新的数据帧中:
val schema = new StructType().add("request", StringType)
val requestDF = spark.createDataFrame(sc.emptyRDD[Row], schema)
val resultDF = requestDF.withColumn("request", concat(to_json(tableAJsonDF("attributesA")),
to_json(tableBJsonDF("attributesB"))))
但我收到以下错误。我读到当您尝试组合两个数据帧时会发生这种类型的错误,但我似乎无法找到一种方法来创建 1 个单个 json,如通过将两个属性组合到 1 个新列中所期望的结果所示,有什么想法吗?
org.apache.spark.sql.AnalysisException:已解决的属性 请求中缺少属性A#4726,属性B#4783 #24790 运算符 !Project [concat(to_json(attributesA#4726, Some(EST)), to_json(attributesB#4783, Some(EST))) AS request#24792].;;
【问题讨论】:
标签: json scala apache-spark apache-spark-sql