【问题标题】:PySpark Window Function Null handlingPySpark 窗口函数 Null 处理
【发布时间】:2022-01-25 10:40:16
【问题描述】:

我正在尝试使用 pyspark==3.2.0 中的窗口函数处理空值。

csv格式的原始数据为:

key1,key2,client_id,event_timestamp
1D7B****-****-****-****-******EC1E09,,9397****-****-****-****-******BFACBB,2021-02-25T16:04:12.391Z
1D7B****-****-****-****-******EC1E09,AE8D****-****-****-****-******3E7E75,9397****-****-****-****-******BFACBB,2021-02-25T02:15:44.587Z
1D7B****-****-****-****-******EC1E09,,9397****-****-****-****-******BFACBB,2021-02-25T02:19:59.084Z
1D7B****-****-****-****-******EC1E09,,9397****-****-****-****-******BFACBB,2021-02-25T02:31:07.170Z

我必须根据key1event_timestamp 得到最后一个key2client_id。目前我写的pyspark代码是:

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

sc = SparkSession
    .builder
    .appName("test run")
    .getOrCreate()

df = sc.read.csv('my-csv.csv')

df.select(
    F.col('key1'),
    F.last('key2', False).over(
        Window.partitionBy('key1').orderBy(F.col('event_timestamp').desc())
    ).alias('last_key2'),
    F.last('client_id', False).over(
        Window.partitionBy('key1').orderBy(F.col('event_timestamp').desc())
    ).alias('last_client_id')
)

但结果返回 2 行数据——key2 的一行为空,key2 的一行非空。

key1,last_key2,last_client_id
1D7B****-****-****-****-******EC1E09,null,9397****-****-****-****-******BFACBB
1D7B****-****-****-****-******EC1E09,AE8D****-****-****-****-******3E7E75,9397****-****-****-****-******BFACBB

这里是预期的结果,因为如果我们仔细观察,last_key2 为空。

key1,last_key2,last_client_id
1D7B****-****-****-****-******EC1E09,null,9397****-****-****-****-******BFACBB

如果我使用按key1 分区的窗口函数,为什么 Spark 返回 2 行?如何编写代码以获得预期的结果?

【问题讨论】:

  • 是否要为每个key1 选择具有最高event_timestamp 的行?
  • 下面的帖子能回答你的问题吗?

标签: python dataframe apache-spark pyspark apache-spark-sql


【解决方案1】:

您在descending 中订购窗口,但使用last 函数,这就是您获得key2 的非空值的原因。 last 函数根据您的顺序为您提供窗口框架中的最后一个值。你想在这里使用的是first函数或者把排序改成ascending

from pyspark.sql import Window
import pyspark.sql.functions as F

w = Window.partitionBy('key1').orderBy(F.col('event_timestamp').desc())

df.select(
    F.col('key1'),
    F.first('key2', ignorenulls=False).over(w).alias('last_key2'),
    F.first('client_id', ignorenulls=False).over(w).alias('last_client_id')
).show(truncate=False)

请注意,使用这样的窗口将始终为每个 key1 返回多行,因为没有分组依据或过滤。您需要在选择后添加distinct()


话虽如此,对于这种情况,您只需在定义的同一窗口规范上使用row_number,然后将列key2client_id 分别重命名为last_key2last_client_id

df.withColumn(
    "rn",
    F.row_number().over(Window.partitionBy('key1').orderBy(F.col('event_timestamp').desc()))
).filter("rn = 1").select(
    F.col('key1'),
    F.col('key2').alias("last_key2"),
    F.col('client_id').alias("last_client_id")
).show(truncate=False)

#+------------------------------------+---------+------------------------------------+
#|key1                                |last_key2|last_client_id                      |
#+------------------------------------+---------+------------------------------------+
#|1D7B****-****-****-****-******EC1E09|null     |9397****-****-****-****-******BFACBB|
#+------------------------------------+---------+------------------------------------+

【讨论】:

    猜你喜欢
    • 2018-03-14
    • 2019-08-16
    • 2019-09-21
    • 2023-02-23
    • 2017-03-17
    • 2016-05-10
    • 2018-01-25
    • 2018-07-11
    • 1970-01-01
    相关资源
    最近更新 更多