对于这样一个简单的问题,您还可以使用explode 函数。不过,我不知道与所选 udf 答案相比的性能特征。
from pyspark.sql import functions as F
df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(['store', 'values'])
df2 = df.withColumn('values', F.explode('values'))
# +-----+------+
# |store|values|
# +-----+------+
# | 1| 1|
# | 1| 2|
# | 1| 3|
# | 1| 4|
# | 1| 5|
# | 1| 6|
# | 2| 2|
# | 2| 3|
# +-----+------+
df3 = df2.groupBy('store').agg(F.collect_list('values').alias('values'))
# +-----+------------------+
# |store| values |
# +-----+------------------+
# |1 |[4, 5, 6, 1, 2, 3]|
# |2 |[2, 3] |
# +-----+------------------+
注意:您可以在聚合中使用 F.collect_set() 或在 df2 上使用 .drop_duplicates() 来删除重复值。
如果您想在收集的列表中维护有序值,我在另一个 SO 答案中找到了以下方法:
from pyspark.sql.window import Window
w = Window.partitionBy('store').orderBy('values')
df3 = df2.withColumn('ordered_value_lists', F.collect_list('values').over(w))
# +-----+------+-------------------+
# |store|values|ordered_value_lists|
# +-----+------+-------------------+
# |1 |1 |[1] |
# |1 |2 |[1, 2] |
# |1 |3 |[1, 2, 3] |
# |1 |4 |[1, 2, 3, 4] |
# |1 |5 |[1, 2, 3, 4, 5] |
# |1 |6 |[1, 2, 3, 4, 5, 6] |
# |2 |2 |[2] |
# |2 |3 |[2, 3] |
# +-----+------+-------------------+
df4 = df3.groupBy('store').agg(F.max('ordered_value_lists').alias('values'))
df4.show(truncate=False)
# +-----+------------------+
# |store|values |
# +-----+------------------+
# |1 |[1, 2, 3, 4, 5, 6]|
# |2 |[2, 3] |
# +-----+------------------+
如果值本身不能确定顺序,您可以使用F.posexplode() 并在窗口函数中使用'pos' 列而不是'values' 来确定顺序。注意:您还需要一个更高级别的 order 列来对原始数组进行排序,然后使用数组中的位置来对数组的元素进行排序。
df = sc.parallelize([(1, [1, 2, 3], 1), (1, [4, 5, 6], 2) , (2, [2], 1),(2, [3], 2)]).toDF(['store', 'values', 'array_order'])
# +-----+---------+-----------+
# |store|values |array_order|
# +-----+---------+-----------+
# |1 |[1, 2, 3]|1 |
# |1 |[4, 5, 6]|2 |
# |2 |[2] |1 |
# |2 |[3] |2 |
# +-----+---------+-----------+
df2 = df.select('*', F.posexplode('values'))
# +-----+---------+-----------+---+---+
# |store|values |array_order|pos|col|
# +-----+---------+-----------+---+---+
# |1 |[1, 2, 3]|1 |0 |1 |
# |1 |[1, 2, 3]|1 |1 |2 |
# |1 |[1, 2, 3]|1 |2 |3 |
# |1 |[4, 5, 6]|2 |0 |4 |
# |1 |[4, 5, 6]|2 |1 |5 |
# |1 |[4, 5, 6]|2 |2 |6 |
# |2 |[2] |1 |0 |2 |
# |2 |[3] |2 |0 |3 |
# +-----+---------+-----------+---+---+
w = Window.partitionBy('store').orderBy('array_order', 'pos')
df3 = df2.withColumn('ordered_value_lists', F.collect_list('col').over(w))
# +-----+---------+-----------+---+---+-------------------+
# |store|values |array_order|pos|col|ordered_value_lists|
# +-----+---------+-----------+---+---+-------------------+
# |1 |[1, 2, 3]|1 |0 |1 |[1] |
# |1 |[1, 2, 3]|1 |1 |2 |[1, 2] |
# |1 |[1, 2, 3]|1 |2 |3 |[1, 2, 3] |
# |1 |[4, 5, 6]|2 |0 |4 |[1, 2, 3, 4] |
# |1 |[4, 5, 6]|2 |1 |5 |[1, 2, 3, 4, 5] |
# |1 |[4, 5, 6]|2 |2 |6 |[1, 2, 3, 4, 5, 6] |
# |2 |[2] |1 |0 |2 |[2] |
# |2 |[3] |2 |0 |3 |[2, 3] |
# +-----+---------+-----------+---+---+-------------------+
df4 = df3.groupBy('store').agg(F.max('ordered_value_lists').alias('values'))
# +-----+------------------+
# |store|values |
# +-----+------------------+
# |1 |[1, 2, 3, 4, 5, 6]|
# |2 |[2, 3] |
# +-----+------------------+
编辑:如果您想保留一些列以备不时之需,并且不需要汇总,您可以将它们包含在 groupBy 中或在汇总后重新加入它们(示例如下)。如果它们确实需要聚合,只需按 'store' 分组,然后在 'other' 列上添加您需要的任何聚合函数到 .agg() 调用。
from pyspark.sql import functions as F
df = sc.parallelize([(1, [1, 2, 3], 'a'), (1, [4, 5, 6], 'a') , (2, [2], 'b'), (2, [3], 'b')]).toDF(['store', 'values', 'other'])
# +-----+---------+-----+
# |store| values|other|
# +-----+---------+-----+
# | 1|[1, 2, 3]| a|
# | 1|[4, 5, 6]| a|
# | 2| [2]| b|
# | 2| [3]| b|
# +-----+---------+-----+
df2 = df.withColumn('values', F.explode('values'))
# +-----+------+-----+
# |store|values|other|
# +-----+------+-----+
# | 1| 1| a|
# | 1| 2| a|
# | 1| 3| a|
# | 1| 4| a|
# | 1| 5| a|
# | 1| 6| a|
# | 2| 2| b|
# | 2| 3| b|
# +-----+------+-----+
df3 = df2.groupBy('store', 'other').agg(F.collect_list('values').alias('values'))
# +-----+-----+------------------+
# |store|other| values|
# +-----+-----+------------------+
# | 1| a|[1, 2, 3, 4, 5, 6]|
# | 2| b| [2, 3]|
# +-----+-----+------------------+
df4 = (
df.drop('values')
.join(
df2.groupBy('store')
.agg(F.collect_list('values').alias('values')),
on=['store'], how='inner'
)
.drop_duplicates()
)
# +-----+-----+------------------+
# |store|other| values|
# +-----+-----+------------------+
# | 1| a|[1, 2, 3, 4, 5, 6]|
# | 2| b| [2, 3]|
# +-----+-----+------------------+