【问题标题】:How to Process array of json column in spark sql dataframe如何处理spark sql数据框中的json列数组
【发布时间】:2020-07-23 06:25:06
【问题描述】:

输入Json

{"studentName": "abc","mailId": "abc@gmail.com","class" : 7,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"}, {"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "xyz@gmail.com","class" : 8,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"English","score":70,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "efg@gmail.com","class" : 9,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Chemistry","score":72,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}

+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|class|grade|mailId       |newSub     |score|scoreBoard                                                                                      |studentName|
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|7    |A    |abc@gmail.com|Environment|95   |[[A,90,Math], [A,82,Science], [A,80,History], [B,75,Hindi], [A,80,English], [A,80,Geography]]   |abc        |
|8    |A    |xyz@gmail.com|Environment|95   |[[A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [B,70,English], [A,87,Biology]]   |xyz        |
|9    |A    |efg@gmail.com|Environment|95   |[[A,91,Math], [B,77,Physics], [B,72,Chemistry], [A,95,Computer], [A,82,English], [B,76,Biology]]|efg        |
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+

我想要的处理 -

  1. 添加newSub的json是scoreBoard列表(从用户行读取数据-newSub、score、grade)

  2. 按分数对它们进行排序并从 scoreBoard 列表中删除分数较低的 json

预期输出 -

{"studentName": "abc","mailId": "abc@gmail.com","class" : 7,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "xyz@gmail.com","class" : 8,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "efg@gmail.com","class" : 9,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}

+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|class|mailId       |scoreBoard                                                                                         |studentName|
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|7    |abc@gmail.com|[[A,95,Environment], [A,90,Math], [A,82,Science], [A,80,History], [A,80,English], [A,80,Geography]]|abc        |
|8    |xyz@gmail.com|[[A,95,Environment], [A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [A,87,Biology]]  |xyz        |
|9    |efg@gmail.com|[[A,95,Environment], [A,91,Math], [B,77,Physics], [A,95,Computer], [A,82,English], [B,76,Biology]] |efg        |
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+

我试过了

第一种方式 - UDF 处理,但从 UDF 中的 scoreBoard 列中排序和删除 json 具有挑战性

第二种方式 - 爆炸列记分板,为单个学生获得 6 行,每个学生为每个科目。我面临的挑战是,如何明智地处理数据组,例如如何为新主题添加新行,对每个用户的主题分数进行排序并删除一行。

需要帮助来选择解决此问题的方法,如果有人知道是否有任何新的/不同的有效方法来进行相同的处理。 谢谢!!

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:
     import ss.implicits._
    
      val schema = new ArrayType(new StructType(Array(
        StructField("grade",DataTypes.StringType,true),
        StructField("score",DataTypes.LongType,true),
        StructField("subject",DataTypes.StringType,true))),true)
    
      def addValue = udf((array: Seq[Row], newval:Row)=> array ++ Array(newval),schema)
    
      def sortAndRemove = udf((array: Seq[Row])=> array.sortBy(x=>x.getAs[Long]("score"))(Ordering[Long].reverse).slice(0,array.length-1),schema)
    
    val df2 =  df.withColumn("map_col",struct(col("grade"),col("score"),col("newSub").as("subject")))
        .withColumn("scoreBoard",sortAndRemove(addValue(col("scoreBoard"),col("map_col"))))
      df2.select("scoreBoard").show(false)
    

    UDF 方法,其中 ss 是 SparkSession。如果使用 spark 2.4 及以上版本,addvalue 可以替换为 array_union。

    以上代码适用于 spark 2.0 及更高版本

    【讨论】:

    • 如果架构中也有纪元日属性以及成绩、分数、主题。 -> {“studentName”:“abc”,“mailId”:“abc@gmail.com”,“class”:7,“newSub”:“Environment”,“grade”:“A”,“score”:95 ,"currentDay" : "18358","scoreBoard" : [{"subject":"Math","score":90,"grade":"A","day": "18340"},{"subject" :"Science","score":82,"grade":"A","day":"18354"},{"subject":"History","score":80,"grade":"A" ,"day":"18355"},{"subject":"Environment","score":95,"grade":"A","day":"18356"}, {"subject":"English" ,"score":80,"grade":"A","day":"18356"},{"subject":"Geography","score":80,"grade":"A","day" : "18356"}]}
    • |class|currentDay|grade|mailId |newSub |score|scoreBoard |studentName| +-----+---------+-----+-------------+-----------+ -----+-------------------------------------------- -------------------------------------------------- -----------------------------------------+-------- ---+ |7 |18358 |A |abc@gmail.com|环境|95 |[[18340,A,90,数学], [18354,A,82,科学], [18355,A,80,历史], [18356,A,95,Environment], [18356,A,80,English], [18356,A,80,Geography]]|abc|
    • 1.如果新子的分数与 scoreBoard 相同,例如 Environment 在 cilumn 和 scoreBoard 中也有 95,那么我不想添加新的第 2 列。我想将所有日期与 currentDay 列和 scoreBoard 条目进行比较,这些条目早于 10从 currentDay 开始的一天 .. 我想删除它们(可能有 1 / 2 / n 或没有旧记录要删除)这里 day = 18340 是旧的,想删除那些条目.. 这可能吗??我怎样才能做到这一点使用 UDF 提前致谢...
    • 可以修改排序和删除udf
    【解决方案2】:

    此方法使用Spark dataframes/datasetsSpark SQL

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.{Row, SparkSession}
    
    object ProcessingList {
      val spark = SparkSession
        .builder()
        .appName("ProcessingList")
        .master("local[*]")
        .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
        .config("spark.app.id","ProcessingList") // To silence Metrics warning
        .getOrCreate()
    
      val sc = spark.sparkContext
    
      val sqlContext = spark.sqlContext
    
      val input = "/home/cloudera/files/tests/list_processing.json"
    
      case class Student(cl: Long, grade: String,mail : String,ns: String,score: Long,sbGrade: String, sbScore: Long,sbSubject: String, name: String)
    
      def main(args: Array[String]): Unit = {
    
        Logger.getRootLogger.setLevel(Level.ERROR)
    
        try {
          import spark.implicits._
    
          val studentTest = sqlContext
            .read
            .json(input)
            .flatMap(r => r.getSeq(5).map( (sq: Row)  => Student(r.getLong(0), r.getString(1), r.getString(2), r.getString(3), r.getLong(4),sq.getString(0),sq.getLong(1), sq.getString(2), r.getString(6)))).as[Student]
            .cache()
    
          studentTest.show(truncate = false)
    
          studentTest.createOrReplaceTempView("student_test")
    
          sqlContext
              .sql(
                """
                  |SELECT cl, grade, mail,ns, score, 
                  |RANK() OVER(PARTITION BY cl ORDER BY sbScore DESC) AS ranking, 
                  |sbGrade,sbScore, sbSubject, name
                  |FROM student_test
                  |ORDER BY cl
                  |""".stripMargin)
              .show(truncate = false)
    
    
          // To have the opportunity to view the web console of Spark: http://localhost:4041/
          println("Type whatever to the console to exit......")
          scala.io.StdIn.readLine()
        } finally {
          sc.stop()
          println("SparkContext stopped")
          spark.stop()
          println("SparkSession stopped")
        }
      }
    }
    

    和预期结果

    +---+-----+-------------+-----------+-----+------+-------+-------+---------+----+
    |cl |grade|mail         |ns         |score|points|sbGrade|sbScore|sbSubject|name|
    +---+-----+-------------+-----------+-----+------+-------+-------+---------+----+
    |7  |A    |abc@gmail.com|Environment|95   |1     |A      |90     |Math     |abc |
    |7  |A    |abc@gmail.com|Environment|95   |2     |A      |82     |Science  |abc |
    |7  |A    |abc@gmail.com|Environment|95   |3     |A      |80     |History  |abc |
    |7  |A    |abc@gmail.com|Environment|95   |3     |A      |80     |English  |abc |
    |7  |A    |abc@gmail.com|Environment|95   |3     |A      |80     |Geography|abc |
    |7  |A    |abc@gmail.com|Environment|95   |6     |B      |75     |Hindi    |abc |
    |8  |A    |xyz@gmail.com|Environment|95   |1     |A      |90     |Math     |xyz |
    |8  |A    |xyz@gmail.com|Environment|95   |2     |A      |87     |Biology  |xyz |
    |8  |A    |xyz@gmail.com|Environment|95   |3     |A      |85     |Physics  |xyz |
    |8  |A    |xyz@gmail.com|Environment|95   |4     |A      |80     |Chemistry|xyz |
    |8  |A    |xyz@gmail.com|Environment|95   |5     |B      |75     |Hindi    |xyz |
    |8  |A    |xyz@gmail.com|Environment|95   |6     |B      |70     |English  |xyz |
    |9  |A    |efg@gmail.com|Environment|95   |1     |A      |95     |Computer |efg |
    |9  |A    |efg@gmail.com|Environment|95   |2     |A      |91     |Math     |efg |
    |9  |A    |efg@gmail.com|Environment|95   |3     |A      |82     |English  |efg |
    |9  |A    |efg@gmail.com|Environment|95   |4     |B      |77     |Physics  |efg |
    |9  |A    |efg@gmail.com|Environment|95   |5     |B      |76     |Biology  |efg |
    |9  |A    |efg@gmail.com|Environment|95   |6     |B      |72     |Chemistry|efg |
    +---+-----+-------------+-----------+-----+------+-------+-------+---------+----+
    

    【讨论】:

      猜你喜欢
      • 2020-01-29
      • 1970-01-01
      • 2020-03-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多