【问题标题】:How to translate SQL UPDATE query which uses inner join into PySpark?如何将使用内部联接的 SQL UPDATE 查询转换为 PySpark?
【发布时间】:2022-11-06 22:29:09
【问题描述】:

我有两个要转换为 PySpark 的 MS Access SQL 查询。查询如下所示(我们有两个表 Employee 和 Department):

UPDATE EMPLOYEE INNER JOIN [DEPARTMENT] ON
EMPLOYEE.STATEPROVINCE = [DEPARTMENT].[STATE_LEVEL] 
SET EMPLOYEE.STATEPROVINCE = [DEPARTMENT]![STATE_ABBREVIATION];
UPDATE EMPLOYEE INNER JOIN [DEPARTMENT] ON
EMPLOYEE.STATEPROVINCE = [DEPARTMENT].[STATE_LEVEL] 
SET EMPLOYEE.MARKET = [DEPARTMENT]![MARKET];

【问题讨论】:

  • 你想更新什么?数据框只是选择的结果,因此您可以使用简单的连接语句“更新”您的数据框。如果要更新存储在某处的表,则取决于目标技术。
  • 我首先将此 sql 数据加载到我的数据湖中,然后将其加载到数据帧中

标签: apache-spark join pyspark sql-update azure-databricks


【解决方案1】:

测试数据框:

from pyspark.sql import functions as F

df_emp = spark.createDataFrame([(1, 'a'), (2, 'bb')], ['EMPLOYEE', 'STATEPROVINCE'])
df_emp.show()
# +--------+-------------+
# |EMPLOYEE|STATEPROVINCE|
# +--------+-------------+
# |       1|            a|
# |       2|           bb|
# +--------+-------------+

df_dept = spark.createDataFrame([('bb', 'b')], ['STATE_LEVEL', 'STATE_ABBREVIATION'])
df_dept.show()
# +-----------+------------------+
# |STATE_LEVEL|STATE_ABBREVIATION|
# +-----------+------------------+
# |         bb|                 b|
# +-----------+------------------+

在 Microsoft Access 中运行 SQL 查询会执行以下操作:

在 PySpark 中,你可以像这样得到它:

df = (df_emp.alias('a')
    .join(df_dept.alias('b'), df_emp.STATEPROVINCE == df_dept.STATE_LEVEL, 'left')
    .select(
        *[c for c in df_emp.columns if c != 'STATEPROVINCE'],
        F.coalesce('b.STATE_ABBREVIATION', 'a.STATEPROVINCE').alias('STATEPROVINCE')
    )
)
df.show()
# +--------+-------------+
# |EMPLOYEE|STATEPROVINCE|
# +--------+-------------+
# |       1|            a|
# |       2|            b|
# +--------+-------------+

首先你做一个左join。然后,select

select 有 2 个部分。

  • 首先,从df_emp 中选择除“STATEPROVINCE”之外的所有内容。
  • 然后,对于新的“STATEPROVINCE”,从df_dept 中选择“STATE_ABBREVIATION”,但如果它为空(即df_dept 中不存在),则从df_emp 中选择“STATEPROVINCE”。

对于第二个查询,您只需更改 select 语句中的值:

df = (df_emp.alias('a')
    .join(df_dept.alias('b'), df_emp.STATEPROVINCE == df_dept.STATE_LEVEL, 'left')
    .select(
        *[c for c in df_emp.columns if c != 'MARKET'],
        F.coalesce('b.MARKET', 'a.MARKET').alias('MARKET')
    )
)

【讨论】:

  • 嗨,我想在这里使用内连接来获得我想要的结果,或者我们必须只使用左连接
  • 你不需要内部连接。这仅在 Access 中需要,作为显示要更新哪些行的一种方式。但是在 PySpark 中你不需要它,因为它会导致更多问题。
  • 请尝试分析答案。差别不大,你可以自己做!这次我已经做到了-我已经更新了答案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2022-01-18
  • 2010-10-10
  • 1970-01-01
  • 2021-02-04
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多