【问题标题】:Window.rowsBetween - only consider rows fulfilling a specific condition (e.g. not being null)Window.rowsBetween - 只考虑满足特定条件的行(例如不为空)
【发布时间】:2019-04-23 04:10:27
【问题描述】:

问题

我有一个 Spark DataFrame,其中有一列不是每一行的值,而是仅包含某些行的值(在一定程度上,例如,基于 id 仅每 5 到 10 行)。

现在,我想对包含值的行应用一个窗口函数,这些值涉及前两行和后两行也包含值(所以基本上假设所有包含空值的行都不存在 = 不计入窗口的rowsBetween-范围)。在实践中,我的有效窗口大小可能是任意的,具体取决于存在多少包含空值的行。但是,我总是需要前后两个值。此外,最终结果应该包含所有行,因为其他列包含重要信息。

示例

例如,我想计算以下数据帧中非空行的前两个、当前和下两个(非空)值的总和:

from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql import Row

df = spark.createDataFrame([Row(id=i, val=i * 2 if i % 5 == 0 else None, foo='other') for i in range(100)])
df.show()

输出:

+-----+---+----+
|  foo| id| val|
+-----+---+----+
|other|  0|   0|
|other|  1|null|
|other|  2|null|
|other|  3|null|
|other|  4|null|
|other|  5|  10|
|other|  6|null|
|other|  7|null|
|other|  8|null|
|other|  9|null|
|other| 10|  20|
|other| 11|null|
|other| 12|null|
|other| 13|null|
|other| 14|null|
|other| 15|  30|
|other| 16|null|
|other| 17|null|
|other| 18|null|
|other| 19|null|
+-----+---+----+

如果我只是按原样在数据帧上使用 Window 函数,我无法指定值不能为 null 的条件,因此窗口仅包含使总和等于行值的 null 值:

df2 = df.withColumn('around_sum', F.when(F.col('val').isNotNull(), F.sum(F.col('val')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id')))).otherwise(None))
df2.show()

结果:

+-----+---+----+----------+
|  foo| id| val|around_sum|
+-----+---+----+----------+
|other|  0|   0|         0|
|other|  1|null|      null|
|other|  2|null|      null|
|other|  3|null|      null|
|other|  4|null|      null|
|other|  5|  10|        10|
|other|  6|null|      null|
|other|  7|null|      null|
|other|  8|null|      null|
|other|  9|null|      null|
|other| 10|  20|        20|
|other| 11|null|      null|
|other| 12|null|      null|
|other| 13|null|      null|
|other| 14|null|      null|
|other| 15|  30|        30|
|other| 16|null|      null|
|other| 17|null|      null|
|other| 18|null|      null|
|other| 19|null|      null|
+-----+---+----+----------+

我能够通过创建仅包含值不为空的行的第二个数据框来实现所需的结果,在那里执行窗口操作,然后再次加入结果:

df3 = df.where(F.col('val').isNotNull())\
    .withColumn('around_sum', F.sum(F.col('val')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id'))))\
    .select(F.col('around_sum'), F.col('id').alias('id2'))
df3 = df.join(df3, F.col('id') == F.col('id2'), 'outer').orderBy(F.col('id')).drop('id2')
df3.show()

结果:

+-----+---+----+----------+
|  foo| id| val|around_sum|
+-----+---+----+----------+
|other|  0|   0|        30|
|other|  1|null|      null|
|other|  2|null|      null|
|other|  3|null|      null|
|other|  4|null|      null|
|other|  5|  10|        60|
|other|  6|null|      null|
|other|  7|null|      null|
|other|  8|null|      null|
|other|  9|null|      null|
|other| 10|  20|       100|
|other| 11|null|      null|
|other| 12|null|      null|
|other| 13|null|      null|
|other| 14|null|      null|
|other| 15|  30|       150|
|other| 16|null|      null|
|other| 17|null|      null|
|other| 18|null|      null|
|other| 19|null|      null|
+-----+---+----+----------+

问题

现在我想知道是否可以以某种方式摆脱联接(和第二个 DataFrame),而是直接在 Window 函数中指定条件。

这可能吗?

【问题讨论】:

  • 你考虑过fillna() 吗?
  • @Bala:不知道如何使用 fillna() 来达到这个目的——我真的不想填充空值。我还研究了使用最后/第一个,但我还没有找到一个可以处理超过 +/- 一个“周围”值的解决方案(但在这种情况下我需要两个)。

标签: python apache-spark pyspark window-functions


【解决方案1】:

一个好的解决方案是从用 0 填充空值开始,然后执行操作。只对所涉及的列进行填充,如下所示:

df = df.fillna(0,subset=['val'])

如果不确定是否要去除空值,请复制列值,然后计算该列上的窗口,以便在操作后去除它。

像这样:

df = df.withColumn('val2',F.col('val'))
df = df.fillna(0,subset=['val2'])
# Then perform the operations over val2.
df = df.withColumn('around_sum', F.sum(F.col('val2')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id'))))
# After the operations, get rid of the copy column
df = df.drop('val2')

【讨论】:

  • 感谢您的回复!结果看起来与我的预期结果非常不同,但是:当我运行它时,around_sum == 我的示例数据框的值。
猜你喜欢
  • 1970-01-01
  • 2017-03-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-05-13
  • 1970-01-01
  • 2022-08-18
相关资源
最近更新 更多