【问题标题】:Spark dataframe to nested JSONSpark 数据帧到嵌套 JSON
【发布时间】:2019-04-11 12:51:46
【问题描述】:

我有一个数据框 joinDf 是通过在 userId 上加入以下四个数据框创建的:

val detailsDf = Seq((123,"first123","xyz"))
                .toDF("userId","firstName","address")


val emailDf = Seq((123,"abc@gmail.com"),
                  (123,"def@gmail.com"))
              .toDF("userId","email")


val foodDf = Seq((123,"food2",false,"Italian",2),
                 (123,"food3",true,"American",3),
                 (123,"food1",true,"Mediterranean",1))
            .toDF("userId","foodName","isFavFood","cuisine","score")


val gameDf = Seq((123,"chess",false,2),
                 (123,"football",true,1))
             .toDF("userId","gameName","isOutdoor","score")

val joinDf = detailsDf
            .join(emailDf, Seq("userId"))
            .join(foodDf, Seq("userId"))
            .join(gameDf, Seq("userId"))

User 的食物和游戏收藏应按分数升序排列。

我正在尝试从此joinDf 创建一个结果,其中 JSON 如下所示:

[
  {
  "userId": "123",
  "firstName": "first123",
  "address": "xyz",
  "UserFoodFavourites": [
    {
     "foodName": "food1",
     "isFavFood": "true",
     "cuisine": "Mediterranean",
    },
    {
     "foodName": "food2",
     "isFavFood": "false",
     "cuisine": "Italian",
    },
    {
     "foodName": "food3",
     "isFavFood": "true",
     "cuisine": "American",
    }
   ]
   "UserEmail": [
     "abc@gmail.com",
     "def@gmail.com"
   ]
   "UserGameFavourites": [
     {
      "gameName": "football",
      "isOutdoor": "true"
     },
     {
      "gameName": "chess",
      "isOutdoor": "false"
     }
   ]
  }
]

我应该使用joinDf.groupBy().agg(collect_set())吗?

任何帮助将不胜感激。

【问题讨论】:

    标签: apache-spark dataframe apache-spark-sql apache-spark-dataset


    【解决方案1】:

    我的解决方案基于找到的答案 herehere

    它使用窗口功能。它展示了如何根据食物评分为给定的userid 创建食物偏好的嵌套列表。在这里,我们从我们拥有的列中创建 FoodDetailsstruct

    val foodModifiedDf = foodDf.withColumn("FoodDetails",
                                struct("foodName","isFavFood", "cuisine","score"))
                                .drop("foodName","isFavFood", "cuisine","score")
    
    println("Just printing the food detials here")
    foodModifiedDf.show(10, truncate = false)
    

    在这里,我们正在创建一个窗口函数,它将根据FoodDetails.score 按降序累积userId 的列表。应用时,窗口函数会在遇到具有相同userId 的新行时继续累积列表。累加完成后,我们必须对userId 做一个groupBy 以选择最大的列表。

    import org.apache.spark.sql.expressions.Window
    
    
    val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))
    
    val userAndFood = detailsDf.join(foodModifiedDf, "userId")
    
    val newUF  = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")
    
    println(" UserAndFood dataframe after windowing function applied")
    newUF.show(10, truncate = false)
    
    val resultUF = newUF.groupBy("userId")
                      .agg(max("FDNew"))
    
    println("Final result after select the maximum length list")
    resultUF.show(10, truncate = false)
    

    这是最终的结果:

    +------+-----------------------------------------------------------------------------------------+
    |userId|max(FDNew)                                                                               |
    +------+-----------------------------------------------------------------------------------------+
    |123   |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
    +------+-----------------------------------------------------------------------------------------+
    

    给定这个数据框,写出嵌套的 json 应该更容易。

    【讨论】:

    • 在这个解决方案中,您将如何将 detailsDf 中的 "firstName","address" 列转移到 resultDF ?
    • 聚合中可以使用newUF.groupBy("userId").agg(max("FDNew"), first(firstName), first(address))
    • 所以基本上加入后的groupBy用于在加入后对记录进行重复数据删除。更多的连接会给执行者增加额外的重复和额外的工作。
    • 正确阅读问题。我们需要通过score 对食物偏好进行排序。这就是 Window 函数在这里所做的。您可以自己运行代码并查看结果。
    • 确实,我错过了,当我这样做的时候已经很晚了,感谢您指出,我已经通过快速修复更新了我的答案并获得了所需的输出并避免了执行者的额外负担。
    【解决方案2】:

    在分组和收集列表之前加入的主要问题是加入会产生大量记录以使分组依据崩溃,在您的示例中是加入之后和分组依据之前的12条记录,您还需要担心从 12 个重复项中挑选出 "firstName","address" detailsDf。为了避免这两个问题,您可以使用 struct 和 groupBy 预处理食物、电子邮件和游戏数据帧,并将它们加入到 detailsDf 中,而不会由于具有相同 的多个记录而导致数据爆炸的风险连接表中的用户 ID

    val detailsDf = Seq((123,"first123","xyz"))
                .toDF("userId","firstName","address")
    
    
    val emailDf = Seq((123,"abc@gmail.com"),
                  (123,"def@gmail.com"))
              .toDF("userId","email")
    
    
    val foodDf = Seq((123,"food2",false,"Italian",2),
                 (123,"food3",true,"American",3),
                 (123,"food1",true,"Mediterranean",1))
            .toDF("userId","foodName","isFavFood","cuisine","score")
    
    
    val gameDf = Seq((123,"chess",false,2),
                 (123,"football",true,1))
             .toDF("userId","gameName","isOutdoor","score")
    
    val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))
    
    val foodGrp = foodDf
              .select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
              .groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))
    
    val gameGrp = gameDf
              .select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
              .groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))
    
    val result = detailsDf.join(emailGrp, Seq("userId"))
            .join(foodGrp, Seq("userId"))
            .join(gameGrp, Seq("userId"))
    
    result.show(100, false)
    

    输出:

    +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
    |userId|firstName|address|UserEmail                     |UserFoodFavourites                                                                       |UserGameFavourites                      |
    +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
    |123   |first123 |xyz    |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
    +------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
    

    由于所有 groupBy 都在 userId 上完成并加入,spark 将对其进行很好的优化。

    更新 1:在@user238607 指出我错过了按分数排序的食物偏好的原始要求后,做了一个快速修复并将 score 列放置为UserFoodFavourites 结构的第一个元素并使用 sort_array 函数按所需顺序排列数据,而无需强制进行额外的随机操作。更新了代码及其输出。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-12-30
      • 2020-08-24
      • 2020-11-08
      • 2022-01-09
      • 2018-09-07
      • 2021-07-02
      • 2021-07-18
      相关资源
      最近更新 更多