【问题标题】:Pyspark - groupby concat string columns by orderPyspark - groupby 按顺序连接字符串列
【发布时间】:2020-01-01 23:02:20
【问题描述】:

我有一个包含以下列的数据框 - 用户、订单、食物。

例如:

df = spark.createDataFrame(pd.DataFrame([['A','B','A','C','A'],[1,1,2,1,3],['Eggs','Salad','Peaches','Bread','Water']],index=['User','Order','Food']).T)

我想将所有食物连接成一个按顺序排序并按每个用户分组的字符串

如果我运行以下命令:

df.groupBy("User").agg(concat_ws(" $ ",collect_list("Food")).alias("Food List"))

我得到一个列表,但食物没有按顺序连接。

User Food List
B   Salad
C   Bread
A   Eggs $ Water $ Peaches

有什么好方法可以按顺序连接食物列表?

【问题讨论】:

标签: pyspark


【解决方案1】:

在这里尝试使用window

  1. 构建数据框
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import mean, pandas_udf, PandasUDFType
from pyspark.sql.types import *

df = spark.createDataFrame(pd.DataFrame([['A','B','A','C','A'],[1,1,2,1,3],['Eggs','Salad','Peaches','Bread','Water']],index=['User','Order','Food']).T)
df.show()

+----+-----+-------+
|User|Order|   Food|
+----+-----+-------+
|   A|    1|   Eggs|
|   B|    1|  Salad|
|   A|    2|Peaches|
|   C|    1|  Bread|
|   A|    3|  Water|
+----+-----+-------+

  1. 创建窗口并应用udf 来加入字符串:
w = Window.partitionBy('User').orderBy('Order').rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

@pandas_udf(StringType(), PandasUDFType.GROUPED_AGG)
def _udf(v):
    return ' $ '.join(v)

df = df.withColumn('Food List', _udf(df['Food']).over(w)).dropDuplicates(['User', 'Food List']).drop(*['Order', 'Food'])
df.show(truncate=False)

+----+----------------------+
|User|Food List             |
+----+----------------------+
|B   |Salad                 |
|C   |Bread                 |
|A   |Eggs $ Peaches $ Water|
+----+----------------------+

【讨论】:

    【解决方案2】:

    根据可能的重复评论 - collect_list by preserving order based on another variable,我想出了一个解决方案。

    首先定义一个排序函数。这需要一个结构,按顺序排序,然后以字符串格式返回项目列表,以'$'分隔

    # define udf
    def sorter(l):
      res = sorted(l, key=lambda x: x.Order)
      return ' $ '.join([item[1] for item in res])
    
    sort_udf = udf(sorter,StringType())
    

    然后创建结构体并运行排序函数:

    SortedFoodList = (df.groupBy("User")
                        .agg(collect_list(struct("Order","Food")).alias("food_list"))
                        .withColumn("sorted_foods",sort_udf("food_list"))
                        .drop("food_list)
                      )
    

    【讨论】:

      猜你喜欢
      • 2016-04-21
      • 2022-04-06
      • 2021-05-29
      • 1970-01-01
      • 2023-02-10
      • 2015-11-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多