【问题标题】:Rank values in Spark on a column based on previous values基于先前值对 Spark 中的值进行排名
【发布时间】:2021-09-23 13:11:23
【问题描述】:

我有一个这样的数据框:

df = spark.createDataFrame(
    [
        (dt.datetime(2021, 5, 1, 10, 30, 0), 2.15, "a"),
        (dt.datetime(2021, 5, 1, 10, 30, 10), 2.12, "a"),
        (dt.datetime(2021, 5, 1, 10, 30, 20), 2.13, "a"),
        (dt.datetime(2021, 5, 1, 10, 30, 50), 2.14, "a"),
        (dt.datetime(2021, 5, 1, 10, 31, 5), 2.13, "a"),
        (dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "a"),
        (dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "b"),
    ],
    ["ts", "value", "group"]
)

我想使用所有以前的值(按时间戳 ts 排序)来获取值列的排名。例如:

+-------------------+-----+-----+----+
|                 ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:30:00| 2.15|    a|   1|
|2021-05-01 10:30:10| 2.12|    a|   1|
|2021-05-01 10:30:20| 2.13|    a|   2|
|2021-05-01 10:30:50| 2.14|    a|   3|
|2021-05-01 10:31:05| 2.13|    a|   2|
|2021-05-01 10:31:10| 2.16|    a|   5|
|2021-05-01 10:31:10| 2.16|    b|   1|
+-------------------+-----+-----+----+

我尝试了以下代码:

w = (
    Window
    .partitionBy("group")
    .orderBy("ts")
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
df.select(
    "*", 
    f.rank().over(w).alias("rank")
).show()

但基本上只是在时间戳上对列进行排名。

知道怎么做吗?

【问题讨论】:

    标签: apache-spark pyspark aggregate-functions


    【解决方案1】:

    rank 函数按orderBy 子句对数据进行排序,因此不能按另一列对其进行排序。您可以将其用作替代方法

    df = (df
          .withColumn("rank", F.array_sort(F.collect_set('value').over(w)))
          .withColumn('rank', F.expr("array_position(rank, value)")))
    df.show()
    
    +-------------------+-----+-----+----+
    |                 ts|value|group|rank|
    +-------------------+-----+-----+----+
    |2021-05-01 10:31:10| 2.16|    b|   1|
    |2021-05-01 10:30:00| 2.15|    a|   1|
    |2021-05-01 10:30:10| 2.12|    a|   1|
    |2021-05-01 10:30:20| 2.13|    a|   2|
    |2021-05-01 10:30:50| 2.14|    a|   3|
    |2021-05-01 10:31:05| 2.13|    a|   2|
    |2021-05-01 10:31:10| 2.16|    a|   5|
    +-------------------+-----+-----+----+
    

    如果你想获得dense_rank,请使用collect_list

    【讨论】:

    • 谢谢,正是我需要的!
    【解决方案2】:

    将您的 orderBy() 列更改为 value

    import datetime as dt
    df = spark.createDataFrame(
        [
            (dt.datetime(2021, 5, 1, 10, 30, 0), 2.15, "a"),
            (dt.datetime(2021, 5, 1, 10, 30, 10), 2.12, "a"),
            (dt.datetime(2021, 5, 1, 10, 30, 20), 2.13, "a"),
            (dt.datetime(2021, 5, 1, 10, 30, 50), 2.14, "a"),
            (dt.datetime(2021, 5, 1, 10, 31, 5), 2.13, "a"),
            (dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "b"),
            (dt.datetime(2021, 5, 1, 10, 31, 11), 2.17, "b"),
        ],
        ["ts", "value", "group"]
    )
    w = (
        W
        .partitionBy("group")
        .orderBy("value")
    )
    df.select(
        "*", 
        F.rank().over(w).alias("rank")
    ).show()
    
    +-------------------+-----+-----+----+
    |                 ts|value|group|rank|
    +-------------------+-----+-----+----+
    |2021-05-01 10:30:10| 2.12|    a|   1|
    |2021-05-01 10:30:20| 2.13|    a|   2|
    |2021-05-01 10:31:05| 2.13|    a|   2|
    |2021-05-01 10:30:50| 2.14|    a|   4|
    |2021-05-01 10:30:00| 2.15|    a|   5|
    |2021-05-01 10:31:10| 2.16|    b|   1|
    |2021-05-01 10:31:11| 2.17|    b|   2|
    +-------------------+-----+-----+----+
    

    【讨论】:

    • 问题是我只想对当前时间戳之前到达的值执行排名操作。如果我只使用 orderBy 的值,它会将排名独立于到达时间应用于所有值。
    猜你喜欢
    • 2017-09-17
    • 1970-01-01
    • 1970-01-01
    • 2021-01-13
    • 1970-01-01
    • 2015-09-09
    • 1970-01-01
    • 2014-08-25
    • 2020-11-05
    相关资源
    最近更新 更多