【问题标题】:Window function and conditional filters in PySparkPySpark 中的窗口函数和条件过滤器
【发布时间】:2019-11-20 22:16:15
【问题描述】:
有没有办法有条件地将过滤器应用于 pyspark 中的窗口函数?对于col1 中的每个组,我只想保留col2 中具有X 的行。如果一个组在col2 中没有X,我想保留该组中的所有行。
+------+------+
| col1 | col2 |
+------+------+
| A | |
+------+------+
| A | X |
+------+------+
| A | |
+------+------+
| B | |
+------+------+
| B | |
+------+------+
| B | |
+------+------+
【问题讨论】:
标签:
apache-spark
pyspark
window-functions
【解决方案1】:
您可以使用max 窗口函数来表示该组(由 col1 分区)在 col2 中具有标识符(在本例中为 1)具有“X”。没有“X”的组将被分配null。此后只需过滤中间数据帧即可获得所需的结果。
from pyspark.sql import Window
from pyspark.sql.functions import max,when
w = Window.partitionBy(df.col1)
df_1 = df.withColumn('x_exists',max(when(df.col2 == 'X',1)).over(w))
df_2 = df_1.filter(((df_1.x_exists == 1) & (df_1.col2 == 'X')) | df_1.x_exists.isNull())
df_2.show()
【解决方案2】:
使用collect_list 和更多SQL 语法的替代方法:collect_list 跳过NULL 值,我们使用if(col2='X',1,NULL) 作为列表项,这样当col2 中没有显示“X”时,此collect_list 的大小为零:
from pyspark.sql.functions import expr
df_new = df.withColumn('has_X', expr("size(collect_list(if(col2='X',1,NULL)) OVER (partition by col1))>0")) \
.filter("col2 = 'X' OR !has_X")