【问题标题】:Scala Spark use Window function to find max valueScala Spark 使用 Window 函数查找最大值
【发布时间】:2020-10-16 04:30:37
【问题描述】:

我有一个如下所示的数据集:

+------------------------|-----+
|               timestamp| zone|
+------------------------+-----+
|    2019-01-01 00:05:00 |    A|
|    2019-01-01 00:05:00 |    A|
|    2019-01-01 00:05:00 |    B|
|    2019-01-01 01:05:00 |    C|
|    2019-01-01 02:05:00 |    B|
|    2019-01-01 02:05:00 |    B|
+------------------------+-----+

每小时我需要计算哪个区域的行数最多,并最终得到一个如下所示的表格:

+-----|-----+-----+
| hour| zone| max |
+-----+-----+-----+
|    0|    A|    2|
|    1|    C|    1|
|    2|    B|    2|
+-----+-----+-----+

我的说明说我需要使用 Window 函数和“group by”来找到我的最大计数。

我已经尝试了一些方法,但我不确定我是否接近。任何帮助将不胜感激。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    您可以使用 2 个后续窗口函数来获得结果:

    df
      .withColumn("hour",hour($"timestamp"))
      .withColumn("cnt",count("*").over(Window.partitionBy($"hour",$"zone")))
      .withColumn("rnb",row_number().over(Window.partitionBy($"hour").orderBy($"cnt".desc)))
      .where($"rnb"===1)
      .select($"hour",$"zone",$"cnt".as("max"))
    

    【讨论】:

    • 我喜欢它,但我想知道为什么 row_number 不是 rank
    • 这是一个非常优雅的解决方案。我试过了,效果很好。我将 Chema 标记为正确,只是他先回答(当然他的回答也很有效)。
    • @JacekLaskowski 每小时获得 1 行,但这当然取决于要求,如果您可以拥有相同计数的 2 行,您也可以使用 rank
    • @b-ryce 帮助讨论如何在多个 Spark 答案之间进行选择,并从软件世界中借用它们,您可以根据性能、可扩展性等原则评估 Spark 程序/答案/模块化,最后是可维护性。在我们的例子中,我们可以考虑两个指标,执行计划和执行时间来评估 Spark 的性能。可维护性取决于代码结构和大小。可维护代码被认为是符合最佳实践/设计模式的代码。这些是在 Spark 程序之间进行选择的一些主要因素。
    【解决方案2】:

    您可以将Windowing functionsgroup by 与数据框一起使用。

    在您的情况下,您可以使用rank() over(partition by) 窗口函数。

    import org.apache.spark.sql.function._
    
    // first group by hour and zone
        val df_group = data_tms.
          select(hour(col("timestamp")).as("hour"), col("zone"))
          .groupBy(col("hour"), col("zone"))
          .agg(count("zone").as("max"))
    // second rank by hour order by max in descending order
        val df_rank = df_group.
          select(col("hour"),
            col("zone"),
            col("max"),
            rank().over(Window.partitionBy(col("hour")).orderBy(col("max").desc)).as("rank"))
    // filter by col rank = 1
        df_rank
          .select(col("hour"), 
            col("zone"), 
            col("max"))
          .where(col("rank") === 1)
          .orderBy(col("hour"))
         .show()
    
    /*
    +----+----+---+
    |hour|zone|max|
    +----+----+---+
    |   0|   A|  2|
    |   1|   C|  1|
    |   2|   B|  2|
    +----+----+---+
    */
    

    【讨论】:

      猜你喜欢
      • 2019-10-31
      • 2022-01-15
      • 1970-01-01
      • 2021-01-24
      • 2016-08-12
      • 2013-09-28
      • 1970-01-01
      • 2017-07-18
      相关资源
      最近更新 更多