【问题标题】:How can I convert this SQL query into PySpark code?如何将此 SQL 查询转换为 PySpark 代码?
【发布时间】:2020-11-05 01:29:51
【问题描述】:

我有这张桌子

df = spark.createDataFrame(
    [
        (1, 12345, "a@gmail.com", "2020-01-01"),
        (1, 12345, "a@gmail.com", "2020-01-02"),
        (1, 23456, "a@gmail.com", "2020-01-03"),
        (1, 34567, "a@gmail.com", "2020-01-04"),
        (1, 12345, "a@gmail.com", "2020-01-05"),
        (1, 45678, "a@gmail.com", "2020-01-06"),
        (1, 45678, "a@gmail.com", "2020-01-07"),
        (2, 56789, "b@gmail.com", "2020-01-01"),
        (2, 56789, "b@gmail.com", "2020-01-02"),
        (2, 56789, "c@gmail.com", "2020-01-03"),
        (2, 67890, "c@gmail.com", "2020-01-04"),
        (2, 67890, "c@gmail.com", "2020-01-05"),
        (3, 78901, "d@gmail.com", "2020-01-01"),
        (3, 78901, "d@gmail.com", "2020-01-02"),
        (3, 78901, "d@gmail.com", "2020-01-03"),
    ],
    ["id", "phone_number", "email", "date"],
)

我要从中选择所有行,这些行要么是每个 ID 的第一个日期,要么是自上一个日期以来电话号码或电子邮件地址已更改。

我通过创建一个临时视图然后对其执行原始 SQL 查询来实现这一点,如下所示:

df.createOrReplaceTempView("df")

df = spark.sql(
    """
    SELECT  a.*
    FROM (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS a
    LEFT JOIN (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS b
    ON a.row = b.row + 1 AND a.id = b.id
    WHERE a.phone_number != b.phone_number OR b.phone_number IS NULL OR a.email != b.email OR b.email IS NULL
    """
)

但是,我更喜欢使用纯 PySpark 函数来实现相同的结果。如何将此 SQL 查询转换为 PySpark?

这是我迄今为止尝试过的:

from pyspark.sql import functions as F
from pyspark.sql.window import Window as W

a = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))
b = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))

df = a.join(b, on=[a.row == b.row + 1, a.id == b.id], how="left").where(
    (a.phone_number != b.phone_number)
    | (b.phone_number.isNull())
    | (a.email != b.email)
    | (b.email.isNull())
)

【问题讨论】:

  • first date 你是说最老的?
  • @Steven 是的,每个 ID 的最旧记录,以及每个 ID 自上一个日期以来发生更改的所有记录

标签: sql apache-spark pyspark


【解决方案1】:

我会做一些不同的事情。不遵循您的 SQL,而是直接应用您的业务规则:

w = Window.partitionBy("id").orderBy("date")

df.withColumn(
    "rnk", F.row_number().over(w)
).withColumn(
    "old", F.lag(F.struct([F.col("phone_number"), F.col("email")])).over(w)
).where(
    (F.col("rnk") == 1)
    | (F.col("phone_number") != F.col("old.phone_number"))
    | (F.col("email") != F.col("old.email"))
).show()

+---+------------+-----------+----------+---+--------------------+
| id|phone_number|      email|      date|rnk|                 old|
+---+------------+-----------+----------+---+--------------------+
|  1|       12345|a@gmail.com|2020-01-01|  1|                null|
|  1|       23456|a@gmail.com|2020-01-03|  3|[12345, a@gmail.com]|
|  1|       34567|a@gmail.com|2020-01-04|  4|[23456, a@gmail.com]|
|  1|       12345|a@gmail.com|2020-01-05|  5|[34567, a@gmail.com]|
|  1|       45678|a@gmail.com|2020-01-06|  6|[12345, a@gmail.com]|
|  3|       78901|d@gmail.com|2020-01-01|  1|                null|
|  2|       56789|b@gmail.com|2020-01-01|  1|                null|
|  2|       56789|c@gmail.com|2020-01-03|  3|[56789, b@gmail.com]|
|  2|       67890|c@gmail.com|2020-01-04|  4|[56789, c@gmail.com]|
+---+------------+-----------+----------+---+--------------------+

NB:您可以将 rnk 上的测试替换为 F.col("old").isNull() 上的测试(因此,您不必计算 rnk)

【讨论】:

    猜你喜欢
    • 2022-01-18
    • 1970-01-01
    • 1970-01-01
    • 2021-05-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多