【发布时间】:2020-11-05 01:29:51
【问题描述】:
我有这张桌子
df = spark.createDataFrame(
[
(1, 12345, "a@gmail.com", "2020-01-01"),
(1, 12345, "a@gmail.com", "2020-01-02"),
(1, 23456, "a@gmail.com", "2020-01-03"),
(1, 34567, "a@gmail.com", "2020-01-04"),
(1, 12345, "a@gmail.com", "2020-01-05"),
(1, 45678, "a@gmail.com", "2020-01-06"),
(1, 45678, "a@gmail.com", "2020-01-07"),
(2, 56789, "b@gmail.com", "2020-01-01"),
(2, 56789, "b@gmail.com", "2020-01-02"),
(2, 56789, "c@gmail.com", "2020-01-03"),
(2, 67890, "c@gmail.com", "2020-01-04"),
(2, 67890, "c@gmail.com", "2020-01-05"),
(3, 78901, "d@gmail.com", "2020-01-01"),
(3, 78901, "d@gmail.com", "2020-01-02"),
(3, 78901, "d@gmail.com", "2020-01-03"),
],
["id", "phone_number", "email", "date"],
)
我要从中选择所有行,这些行要么是每个 ID 的第一个日期,要么是自上一个日期以来电话号码或电子邮件地址已更改。
我通过创建一个临时视图然后对其执行原始 SQL 查询来实现这一点,如下所示:
df.createOrReplaceTempView("df")
df = spark.sql(
"""
SELECT a.*
FROM (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS a
LEFT JOIN (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS b
ON a.row = b.row + 1 AND a.id = b.id
WHERE a.phone_number != b.phone_number OR b.phone_number IS NULL OR a.email != b.email OR b.email IS NULL
"""
)
但是,我更喜欢使用纯 PySpark 函数来实现相同的结果。如何将此 SQL 查询转换为 PySpark?
这是我迄今为止尝试过的:
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
a = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))
b = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))
df = a.join(b, on=[a.row == b.row + 1, a.id == b.id], how="left").where(
(a.phone_number != b.phone_number)
| (b.phone_number.isNull())
| (a.email != b.email)
| (b.email.isNull())
)
【问题讨论】:
-
first date你是说最老的? -
@Steven 是的,每个 ID 的最旧记录,以及每个 ID 自上一个日期以来发生更改的所有记录
标签: sql apache-spark pyspark