【问题标题】:How to get second highest value from a column pyspark?如何从列 pyspark 中获得第二高的值?
【发布时间】:2021-09-22 07:25:06
【问题描述】:

我有一个 PySpark 数据帧,我想在将 groupBy 应用于 2 列(即 CUSTOMER_IDADDRESS_ID)之后获得 ORDERED_TIME(日期时间字段 yyyy-mm-dd 格式)的第二高值。

一个客户可以有许多与一个地址相关联的订单,我想获得(customer,address) 对的第二个最近的订单

我的做法是按照CUSTOMER_IDADDRESS_ID做一个窗口和分区,按ORDERED_TIME 排序

sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(col('ORDERED_TIME').desc())

df2 = df2.withColumn("second_recent_order", (df2.select("ORDERED_TIME").collect()[1]).over(sorted_order_times))

但是,我收到一条错误消息,提示 ValueError: 'over' is not in list

谁能提出解决这个问题的正确方法?

如果需要任何其他信息,请告诉我

样本数据

+-----------+----------+-------------------+
|USER_ID    |ADDRESS_ID|       ORDER DATE  | 
+-----------+----------+-------------------+
|        100| 1000     |2021-01-02         |
|        100| 1000     |2021-01-14         |
|        100| 1000     |2021-01-03         |
|        100| 1000     |2021-01-04         |
|        101| 2000     |2020-05-07         |
|        101| 2000     |2021-04-14         |
+-----------+----------+-------------------+

预期输出

+-----------+----------+-------------------+-------------------+
|USER_ID    |ADDRESS_ID|       ORDER DATE  |second_recent_order
+-----------+----------+-------------------+-------------------+
|        100| 1000     |2021-01-02          |2021-01-04 
|        100| 1000     |2021-01-14          |2021-01-04 
|        100| 1000     |2021-01-03          |2021-01-04 
|        100| 1000     |2021-01-04          |2021-01-04 
|        101| 2000     |2020-05-07          |2020-05-07 
|        101| 2000     |2021-04-14          |2020-05-07 
+-----------+----------+-------------------+-------------------

【问题讨论】:

  • 您能否提供一些示例数据以及预期的输出?
  • 对不起,我已经用示例输入和输出编辑了问题

标签: python dataframe apache-spark pyspark group-by


【解决方案1】:

这是另一种方法。使用collect_list

import pyspark.sql.functions as F
from pyspark.sql import Window


sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(F.col('ORDERED_TIME').desc()).rangeBetween(Window.unboundedPreceding,  Window.unboundedFollowing)
df2 = (
  df
  .withColumn("second_recent_order", (F.collect_list(F.col("ORDERED_TIME")).over(sorted_order_times))[1])
)
df2.show()

【讨论】:

    【解决方案2】:

    您可以通过以下方式在此处使用窗口,但如果只有一行在一个组中,您将获得 null

    
    sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(desc('ORDERED_TIME')).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    
    df2 = df2.withColumn(
        "second_recent_order",
        collect_list("ORDERED_TIME").over(sorted_order_times).getItem(1)
    )
    
    

    【讨论】:

      【解决方案3】:

      一种解决方案是创建一个查找表,其中包含所有 CUSTOMER_IDADDRESS_ID 对的第二个最近订单,然后将其与原始数据框连接。
      我假设您的 ORDERED_TIME 列已经是 timestamp 类型。

      import pyspark.sql.functions as F
      from pyspark.sql.window import Window
      
      # define window
      w = Window().partitionBy('CUSTOMER_ID', 'ADDRESS_ID').orderBy(F.desc('ORDERED_TIME'))
      
      # create lookup table
      second_highest = df \
        .withColumn('rank', F.dense_rank().over(w)) \
        .filter(F.col('rank') == 2) \
        .select('CUSTOMER_ID', 'ADDRESS_ID', 'ORDERED_TIME')
      
      # join with original dataframe
      df = df.join(second_highest, on=['CUSTOMER_ID', 'ADDRESS_ID'], how='left')
      
      df.show()
      
      +-----------+----------+-------------------+-------------------+
      |CUSTOMER_ID|ADDRESS_ID|       ORDERED_TIME|       ORDERED_TIME|
      +-----------+----------+-------------------+-------------------+
      |        100| 158932441|2021-01-02 13:35:57|2021-01-04 09:36:10|
      |        100| 158932441|2021-01-14 19:14:08|2021-01-04 09:36:10|
      |        100| 158932441|2021-01-03 13:33:52|2021-01-04 09:36:10|
      |        100| 158932441|2021-01-04 09:36:10|2021-01-04 09:36:10|
      |        101| 281838494|2020-05-07 13:35:57|2020-05-07 13:35:57|
      |        101| 281838494|2021-04-14 19:14:08|2020-05-07 13:35:57|
      +-----------+----------+-------------------+-------------------+
      

      注意:在您的预期输出中,您为 CUSTOMER_ID == 101 写了 2021-04-14 19:14:08,但实际上是 2020-05-07 13:35:57,因为它是在 2020 年。

      【讨论】:

        【解决方案4】:

        可以使用两个窗口:按正确顺序排序以获取行,以及未排序结合“第一”功能获取第二行(Scala):

        val df2 = Seq(
          (100, 158932441, "2021-01-02 13:35:57"),
          (100, 158932441, "2021-01-14 19:14:08"),
          (100, 158932441, "2021-01-03 13:33:52"),
          (100, 158932441, "2021-01-04 09:36:10"),
          (101, 281838494, "2020-05-07 13:35:57"),
          (101, 281838494, "2021-04-14 19:14:08")
        ).toDF("CUSTOMER_ID", "ADDRESS_ID", "ORDERED_TIME")
        
        val sorted_order_times = Window
          .partitionBy("CUSTOMER_ID", "ADDRESS_ID")
          .orderBy(desc("ORDERED_TIME"))
        
        val unsorted_order_times = Window
          .partitionBy("CUSTOMER_ID", "ADDRESS_ID")
        
        df2
          .withColumn("row_number", row_number().over(sorted_order_times))
          .withColumn("second_recent_order",
          first(
            when($"row_number" === lit(2), $"ORDERED_TIME").otherwise(null), true
          ).over(unsorted_order_times))
          .drop("row_number")
        

        输出:

        +-----------+----------+-------------------+-------------------+
        |CUSTOMER_ID|ADDRESS_ID|ORDERED_TIME       |second_recent_order|
        +-----------+----------+-------------------+-------------------+
        |100        |158932441 |2021-01-14 19:14:08|2021-01-04 09:36:10|
        |100        |158932441 |2021-01-04 09:36:10|2021-01-04 09:36:10|
        |100        |158932441 |2021-01-03 13:33:52|2021-01-04 09:36:10|
        |100        |158932441 |2021-01-02 13:35:57|2021-01-04 09:36:10|
        |101        |281838494 |2021-04-14 19:14:08|2020-05-07 13:35:57|
        |101        |281838494 |2020-05-07 13:35:57|2020-05-07 13:35:57|
        +-----------+----------+-------------------+-------------------+
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2013-10-31
          • 2014-09-02
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2016-12-28
          相关资源
          最近更新 更多