【问题标题】:PySpark Column Creation by queuing filtered past rows通过排队过滤过去的行来创建 PySpark 列
【发布时间】:2021-07-29 09:12:12
【问题描述】:

在 PySpark 中,我想在现有表中创建一个新列,用于存储标签为 1 的特定用户的最后 K 个文本。

    Example-

      Index | user_name |  text  | label   |
       0    |    u1     |   t0   |   0     |
       1    |    u1     |   t1   |   1     |
       2    |    u2     |   t2   |   0     |
       3    |    u1     |   t3   |   1     |
       4    |    u2     |   t4   |   0     |
       5    |    u2     |   t5   |   1     |
       6    |    u2     |   t6   |   1     |
       7    |    u1     |   t7   |   0     |
       8    |    u1     |   t8   |   1     |
       9    |    u1     |   t9   |   0     |

新列(text_list)后面的表应该如下,为每个用户存储最后K=2条消息。

     Index  | user_name |  text  | label   |   text_list     |
       0    |    u1     |   t0   |   0     |        []       |
       1    |    u1     |   t1   |   1     |        []       |
       2    |    u2     |   t2   |   0     |        []       |
       3    |    u1     |   t3   |   1     |       [t1]      |
       4    |    u2     |   t4   |   0     |        []       |
       5    |    u2     |   t5   |   1     |        []       |
       6    |    u2     |   t6   |   1     |       [t5]      |
       7    |    u1     |   t7   |   0     |       [t3, t1]  |
       8    |    u1     |   t8   |   1     |       [t3, t1]  |
       9    |    u1     |   t9   |   0     |       [t8, t3]  |

一种天真的方法是循环遍历每一行并为每个用户维护一个队列。但该表可能有数百万行。我们可以在不循环的情况下以更可扩展、更有效的方式做到这一点吗?

【问题讨论】:

    标签: pyspark apache-spark-sql


    【解决方案1】:

    如果您使用的是 spark 版本 >= 2.4,有一种方法可以尝试。假设df 是您的数据框。

    df.show()
    
    # +-----+---------+----+-----+
    # |Index|user_name|text|label|
    # +-----+---------+----+-----+
    # |    0|       u1|  t0|    0|
    # |    1|       u1|  t1|    1|
    # |    2|       u2|  t2|    0|
    # |    3|       u1|  t3|    1|
    # |    4|       u2|  t4|    0|
    # |    5|       u2|  t5|    1|
    # |    6|       u2|  t6|    1|
    # |    7|       u1|  t7|    0|
    # |    8|       u1|  t8|    1|
    # |    9|       u1|  t9|    0|
    # +-----+---------+----+-----+
    

    两步:

    1. 使用collect_list在窗口中获取列文本标签struct列表
    2. filter array where label = 1 并获取 text 值,使用 sort_array 对数组进行降序排序并使用 slice 获取前两个元素

    应该是这样的

    from pyspark.sql.functions import col, collect_list, struct, expr, sort_array, slice
    from pyspark.sql.window import Window
    
    # window : first row to row before current row
    w = Window.partitionBy('user_name').orderBy('index').rowsBetween(Window.unboundedPreceding, -1)
    
    df = (df
          .withColumn('text_list', collect_list(struct(col('text'), col('label'))).over(w))
          .withColumn('text_list', slice(sort_array(expr("FILTER(text_list, value -> value.label = 1).text"), asc=False), 1, 2))
          )
    df.sort('Index').show()
    
    # +-----+---------+----+-----+---------+
    # |Index|user_name|text|label|text_list|
    # +-----+---------+----+-----+---------+
    # |    0|       u1|  t0|    0|       []|
    # |    1|       u1|  t1|    1|       []|
    # |    2|       u2|  t2|    0|       []|
    # |    3|       u1|  t3|    1|     [t1]|
    # |    4|       u2|  t4|    0|       []|
    # |    5|       u2|  t5|    1|       []|
    # |    6|       u2|  t6|    1|     [t5]|
    # |    7|       u1|  t7|    0| [t3, t1]|
    # |    8|       u1|  t8|    1| [t3, t1]|
    # |    9|       u1|  t9|    0| [t8, t3]|
    # +-----+---------+----+-----+---------+
    

    【讨论】:

      【解决方案2】:

      感谢此处发布的解决方案。我稍微修改了它(因为它假设文本字段可以排序)并最终能够找到一个可行的解决方案。这里是:

      from pyspark.sql.window import Window
      from pyspark.sql.functions import col, when, collect_list, slice, reverse
      K = 2
      windowPast = Window.partitionBy("user_name").orderBy("Index").rowsBetween(Window.unboundedPreceding, Window.currentRow-1)
      df.withColumn("text_list", collect_list\
                    (when(col("label")==1,col("text"))\
                     .otherwise(F.lit(None)))\
                    .over(windowPast))\
      .withColumn("text_list", slice(reverse(col("text_list")), 1, K))\
      .sort(F.col("Index"))\
      .show()
      
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-01-21
        • 2019-02-28
        • 1970-01-01
        • 2021-06-09
        • 2022-01-23
        相关资源
        最近更新 更多