【问题标题】:scala spark issue resolving attributes when creating a new columnscala spark问题在创建新列时解决属性
【发布时间】:2021-07-19 23:31:00
【问题描述】:

我正在使用 scala 2.12 spark 3.0.0。我必须创建一个单个 json 字符串,其中每个属性来自不同的表,并将生成的 json 保存在另一个数据框中,即(在此示例中仅使用 1 行以保持简单)

tableA

id action date
u1 insert 20210428

表B

id name date
u1 some name 20210428

我需要返回以下内容:

{
    "A": [ 
    {
      "id":"u1",
      "action": "insert",
      "date": "20210428"
    }
    ],
    "B": [
    {
      "id":"u1",
      "name": "some name"
      "date": "20210428",
    } 
   ]
}  

我尝试了很多方法,但最接近的方法是为每个表执行以下操作:

val tableADF = spark.read.format("delta").load(path +"/tableA")
val tableADF = spark.read.format("delta").load(path +"/tableB")

为每个表创建将所有值转换为 json 的数据框

val tableAJsonDF = tableADF.groupBy("date").agg(collect_list(struct($"id",$"action")).alias("attributesA"))
val tableBJsonDF = tableBDF.groupBy("date").agg(collect_list(struct($"id",$"name")).alias("attributesB"))
date attributesA
20210428 [{"id":"u1", "action": "insert"}]
date attributesB
20210428 [{"id":"u1", "name": "some name"}]

现在将两个表中的 json 合并为一个 json,以添加到新的数据帧中:

val schema = new StructType().add("request", StringType)
val requestDF = spark.createDataFrame(sc.emptyRDD[Row], schema)

val resultDF = requestDF.withColumn("request", concat(to_json(tableAJsonDF("attributesA")), 
                                                      to_json(tableBJsonDF("attributesB"))))

但我收到以下错误。我读到当您尝试组合两个数据帧时会发生这种类型的错误,但我似乎无法找到一种方法来创建 1 个单个 json,如通过将两个属性组合到 1 个新列中所期望的结果所示,有什么想法吗?

org.apache.spark.sql.AnalysisException:已解决的属性 请求中缺少属性A#4726,属性B#4783 #24790 运算符 !Project [concat(to_json(attributesA#4726, Some(EST)), to_json(attributesB#4783, Some(EST))) AS request#24792].;;

【问题讨论】:

    标签: json scala apache-spark apache-spark-sql


    【解决方案1】:

    您需要加入数据框。例如

    val t1 = tableADF.select(
        col("id"), 
        array(struct(tableADF.columns.map(col):_*)).as("A")
    )
    
    val t2 = tableBDF.select(
        col("id"), 
        array(struct(tableBDF.columns.map(col):_*)).as("B")
    )
    
    val result = t1.join(t2, Seq("id")).select(to_json(struct("A", "B")).as("result"))
    
    result.show(false)
    +--------------------------------------------------------------------------------------------------------------+
    |result                                                                                                        |
    +--------------------------------------------------------------------------------------------------------------+
    |{"A":[{"id":"u1","action":"insert","date":"20210428"}],"B":[{"id":"u1","name":"some name","date":"20210428"}]}|
    +--------------------------------------------------------------------------------------------------------------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-05-08
      • 2018-12-06
      • 1970-01-01
      • 1970-01-01
      • 2019-09-11
      • 2021-11-20
      相关资源
      最近更新 更多