【发布时间】:2018-03-22 23:46:28
【问题描述】:
我需要通过将每个组中某个列中的值收集到一个集合中来聚合 DataFrame 中的行。 pyspark.sql.functions.collect_set 正是我所需要的。
但是,我需要对两列依次执行此操作,因为我需要将输入按一列分组,将每个组按另一列划分为子组,并对每个子组进行一些聚合。我不知道如何让collect_set 为每个组创建一个集合。
例子:
df = spark.createDataFrame([('a', 'x', 11, 22), ('a', 'y', 33, 44), ('b', 'x', 55, 66), ('b', 'y', 77, 88),('a','x',12,23),('a','y',34,45),('b','x',56,67),('b','y',78,89)], ('col1', 'col2', 'col3', 'col4'))
df.show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| a| x| 11| 22|
| a| y| 33| 44|
| b| x| 55| 66|
| b| y| 77| 88|
| a| x| 12| 23|
| a| y| 34| 45|
| b| x| 56| 67|
| b| y| 78| 89|
+----+----+----+----+
g1 = df.groupBy('col1', 'col2').agg(collect_set('col3'),collect_set('col4'))
g1.show()
+----+----+-----------------+-----------------+
|col1|col2|collect_set(col3)|collect_set(col4)|
+----+----+-----------------+-----------------+
| a| x| [12, 11]| [22, 23]|
| b| y| [78, 77]| [88, 89]|
| a| y| [33, 34]| [45, 44]|
| b| x| [56, 55]| [66, 67]|
+----+----+-----------------+-----------------+
g2 = g1.groupBy('col1').agg(collect_set('collect_set(col3)'),collect_set('collect_set(col4)'),count('col2'))
g2.show(truncate=False)
+----+--------------------------------------------+--------------------------------------------+-----------+
|col1|collect_set(collect_set(col3)) |collect_set(collect_set(col4)) |count(col2)|
+----+--------------------------------------------+--------------------------------------------+-----------+
|b |[WrappedArray(56, 55), WrappedArray(78, 77)]|[WrappedArray(66, 67), WrappedArray(88, 89)]|2 |
|a |[WrappedArray(33, 34), WrappedArray(12, 11)]|[WrappedArray(22, 23), WrappedArray(45, 44)]|2 |
+----+--------------------------+--------------------------------------------+-----------+
我希望结果看起来更像
+----+----------------+----------------+-----------+
|col1| ...col3... | ...col4... |count(col2)|
+----+----------------+----------------+-----------+
|b |[56, 55, 78, 77]|[66, 67, 88, 89]|2 |
|a |[33, 34, 12, 11]|[22, 23, 45, 44]|2 |
+----+----------------+----------------+-----------+
但我没有看到一个聚合函数来获取两个或更多集合的 union,或者一个 pyspark 操作来展平显示在g2 中的“数组数组”结构。
pyspark 是否提供了一种简单的方法来实现这一点?还是我应该采取完全不同的方法?
【问题讨论】:
-
事后能不能写个udf来展平?
-
@hoyland 这是一个想法,我正在研究它......
标签: apache-spark pyspark