【问题标题】:How to get all columns after groupby on Dataset<Row> in spark sql 2.1.0如何在 spark sql 2.1.0 中的 Dataset<Row> 上获取 groupby 之后的所有列
【发布时间】:2017-01-05 07:05:24
【问题描述】:

首先,我对 SPARK 很陌生

我的数据集中有数百万条记录,我想用名称列分组并查找具有最大年龄的名称。我得到了正确的结果,但我需要结果集中的所有列。

Dataset<Row> resultset = studentDataSet.select("*").groupBy("name").max("age");
resultset.show(1000,false);

我的结果集数据集中只有姓名和最大(年龄)。

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    对于您的解决方案,您必须尝试不同的方法。您几乎可以找到解决方案,但让我帮助您理解。

    Dataset<Row> resultset = studentDataSet.groupBy("name").max("age");
    

    现在你可以做的是加入resultsetstudentDataSet

    Dataset<Row> joinedDS = studentDataset.join(resultset, "name");
    

    groupBy 的问题在于,在应用 groupBy 后,您会得到 RelationalGroupedDataset,因此这取决于您执行的下一个操作,例如 sum, min, mean, max 等,然后这些操作的结果与 groupBy 结合使用

    在您的情况下,name 列与 agemax 连接,因此它将仅返回两列,但如果在 age 上使用应用 groupBy,然后在“年龄”上应用 max列您将得到两列,第一列是age,第二列是max(age)

    注意 :- 代码未经测试,如有需要请进行更改 希望这可以清除您的查询

    【讨论】:

    • 嘿,谢谢! studentDataset.join(resultset, expression) 解决了我的问题。
    • 我知道这是 1 年前的帖子,但仍然是嘿阿努普!很高兴知道您找到了解决方案,您能否通过发布您的代码来帮助我们?
    • 嘿 Akash 和 Anup 这对我不起作用。即使在加入之后,我也只得到两列而不是其他列。它是否也与火花版本有关。我正在使用火花 2.1
    • 我也认为加入可以产生第二轮洗牌,所以我添加了另一个解决方案,在最后不加入并保持使用严格的数据集使用(无数据帧)。
    【解决方案2】:

    接受的答案并不理想,因为它需要加入。加入大数据帧可能会导致执行缓慢的大洗牌。

    让我们创建一个示例数据集并测试代码:

    val df = Seq(
      ("bob", 20, "blah"),
      ("bob", 40, "blah"),
      ("karen", 21, "hi"),
      ("monica", 43, "candy"),
      ("monica", 99, "water")
    ).toDF("name", "age", "another_column")
    

    此代码在大型 DataFrame 上运行速度应该更快。

    df
      .groupBy("name")
      .agg(
        max("name").as("name1_dup"), 
        max("another_column").as("another_column"),  
        max("age").as("age")
      ).drop(
        "name1_dup"
      ).show()
    
    +------+--------------+---+
    |  name|another_column|age|
    +------+--------------+---+
    |monica|         water| 99|
    | karen|            hi| 21|
    |   bob|          blah| 40|
    +------+--------------+---+
    

    【讨论】:

    • .agg( max("name").as("name1_dup"), max("another_column").as("another_column"), max("age").as("age ") ) 给出编译时异常
    • 如果您将行 ("monica", 43, "candy") 更改为 ("monica", 43, "zebra"),此示例工作。 @AnujMehra - 代码在 REPL 中为我解释。
    【解决方案3】:

    你想要达到的目标是

    1. 按年龄分组行
    2. 将每组减少到 1 行,最大年龄

    此替代方案无需使用聚合即可实现此输出

    import org.apache.spark.sql._
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    
    
    object TestJob5 {
    
      def main (args: Array[String]): Unit = {
    
        val sparkSession = SparkSession
          .builder()
          .appName(this.getClass.getName.replace("$", ""))
          .master("local")
          .getOrCreate()
    
        val sc = sparkSession.sparkContext
        sc.setLogLevel("ERROR")
    
        import sparkSession.sqlContext.implicits._
    
        val rawDf = Seq(
          ("Moe",  "Slap",  7.9, 118),
          ("Larry",  "Spank",  8.0, 115),
          ("Curly",  "Twist", 6.0, 113),
          ("Laurel", "Whimper", 7.53, 119),
          ("Hardy", "Laugh", 6.0, 118),
          ("Charley",  "Ignore",   9.7, 115),
          ("Moe",  "Spank",  6.8, 118),
          ("Larry",  "Twist", 6.0, 115),
          ("Charley",  "fall", 9.0, 115)
        ).toDF("name", "requisite", "funniness_of_requisite", "age")
    
        rawDf.show(false)
        rawDf.printSchema
    
        val nameWindow = Window
          .partitionBy("name")
    
        val aggDf = rawDf
          .withColumn("id", monotonically_increasing_id)
          .withColumn("maxFun", max("funniness_of_requisite").over(nameWindow))
          .withColumn("count", count("name").over(nameWindow))
          .withColumn("minId", min("id").over(nameWindow))
          .where(col("maxFun") === col("funniness_of_requisite") && col("minId") === col("id") )
          .drop("maxFun")
          .drop("minId")
          .drop("id")
    
        aggDf.printSchema
    
        aggDf.show(false)
      }
    
    }
    

    请记住,一个组可能有超过 1 行的最大年龄,因此您需要通过某种逻辑选择一个。在示例中,我认为这无关紧要,所以我只分配一个唯一的数字来选择

    【讨论】:

      【解决方案4】:

      注意到随后的连接是额外的洗牌,并且其他一些解决方案在返回中似乎不准确,甚至将数据集转换为数据帧,我寻求更好的解决方案。这是我的:

      case class People(name: String, age: Int, other: String)   
      val df = Seq(
        People("Rob", 20, "cherry"),
        People("Rob", 55, "banana"),
        People("Rob", 40, "apple"),
        People("Ariel", 55, "fox"),
        People("Vera", 43, "zebra"),
        People("Vera", 99, "horse")
      ).toDS
      
      val oldestResults = df
       .groupByKey(_.name)
       .mapGroups{ 
          case (nameKey, peopleIter) => {
              var oldestPerson = peopleIter.next  
              while(peopleIter.hasNext) {
                  val nextPerson = peopleIter.next
                  if(nextPerson.age > oldestPerson.age) oldestPerson = nextPerson 
              }
              oldestPerson
          }
        }    
        oldestResults.show  
      

      以下产生:

      +-----+---+------+
      | name|age| other|
      +-----+---+------+
      |Ariel| 55|   fox|
      |  Rob| 55|banana|
      | Vera| 99| horse|
      +-----+---+------+
      

      【讨论】:

        【解决方案5】:

        您需要记住聚合函数会减少行数,因此您需要使用减少函数指定您想要哪些行年龄。如果您想保留组的所有行(警告!这可能导致爆炸或分区倾斜),您可以将它们收集为一个列表。然后,您可以使用 UDF(用户定义的函数)按照您的标准减少它们,在这个例子中是 funniness_of_requisite。然后使用另一个 UDF 从单个缩减行扩展属于缩减行的列。 出于此答案的目的,我假设您希望保留具有最大 funniness_of_requisite 的人的年龄。

        import org.apache.spark.sql._
        import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
        import org.apache.spark.sql.functions._
        import org.apache.spark.sql.types.{IntegerType, StringType}
        
        import scala.collection.mutable
        
        
        object TestJob4 {
        
        def main (args: Array[String]): Unit = {
        
        val sparkSession = SparkSession
          .builder()
          .appName(this.getClass.getName.replace("$", ""))
          .master("local")
          .getOrCreate()
        
        val sc = sparkSession.sparkContext
        
        import sparkSession.sqlContext.implicits._
        
        val rawDf = Seq(
          (1, "Moe",  "Slap",  7.9, 118),
          (2, "Larry",  "Spank",  8.0, 115),
          (3, "Curly",  "Twist", 6.0, 113),
          (4, "Laurel", "Whimper", 7.53, 119),
          (5, "Hardy", "Laugh", 6.0, 18),
          (6, "Charley",  "Ignore",   9.7, 115),
          (2, "Moe",  "Spank",  6.8, 118),
          (3, "Larry",  "Twist", 6.0, 115),
          (3, "Charley",  "fall", 9.0, 115)
        ).toDF("id", "name", "requisite", "funniness_of_requisite", "age")
        
        rawDf.show(false)
        rawDf.printSchema
        
        val rawSchema = rawDf.schema
        
        val fUdf = udf(reduceByFunniness, rawSchema)
        
        val nameUdf = udf(extractAge, IntegerType)
        
        val aggDf = rawDf
          .groupBy("name")
          .agg(
            count(struct("*")).as("count"),
            max(col("funniness_of_requisite")),
            collect_list(struct("*")).as("horizontal")
          )
          .withColumn("short", fUdf($"horizontal"))
          .withColumn("age", nameUdf($"short"))
          .drop("horizontal")
        
        aggDf.printSchema
        
        aggDf.show(false)
        }
        
        def reduceByFunniness= (x: Any) => {
        
        val d = x.asInstanceOf[mutable.WrappedArray[GenericRowWithSchema]]
        
        val red = d.reduce((r1, r2) => {
        
          val funniness1 = r1.getAs[Double]("funniness_of_requisite")
          val funniness2 = r2.getAs[Double]("funniness_of_requisite")
        
          val r3 = funniness1 match {
            case a if a >= funniness2 =>
              r1
            case _ =>
              r2
          }
        
          r3
        })
        
        red
        }
        
        def extractAge = (x: Any) => {
        
        val d = x.asInstanceOf[GenericRowWithSchema]
        
        d.getAs[Int]("age")
        }
         }
        
          d.getAs[String]("name")
        }
        }
        

        这是输出

        +-------+-----+---------------------------+-------------------------------+---+
        |name   |count|max(funniness_of_requisite)|short                          
        |age|
        +-------+-----+---------------------------+-------------------------------+---+
        |Hardy  |1    |6.0                        |[5, Hardy, Laugh, 6.0, 18]     
        |18 |
        |Moe    |2    |7.9                        |[1, Moe, Slap, 7.9, 118]       
        |118|
        |Curly  |1    |6.0                        |[3, Curly, Twist, 6.0, 113]    
        |113|
        |Larry  |2    |8.0                        |[2, Larry, Spank, 8.0, 115]    
        |115|
        |Laurel |1    |7.53                       |[4, Laurel, Whimper, 7.53, 119]|119|
        |Charley|2    |9.7                        |[6, Charley, Ignore, 9.7, 115] |115|
        +-------+-----+---------------------------+-------------------------------+---+
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2018-04-15
          • 1970-01-01
          • 2018-10-02
          • 2016-08-05
          • 1970-01-01
          • 1970-01-01
          • 2020-05-11
          • 1970-01-01
          相关资源
          最近更新 更多