【问题标题】:Update a column in PySpark while doing multiple inner joins?在执行多个内部联接时更新 PySpark 中的列?
【发布时间】:2022-11-03 18:32:49
【问题描述】:

我有一个 SQL 查询,我正在尝试将其转换为 PySpark。在 SQL 查询中,我们正在加入表并更新匹配的列。 SQL 查询如下所示:

UPDATE [DEPARTMENT_DATA]
INNER JOIN ([COLLEGE_DATA]
            INNER JOIN [STUDENT_TABLE]
            ON COLLEGE_DATA.UNIQUEID = STUDENT_TABLE.PROFESSIONALID)
ON DEPARTMENT_DATA.PUBLICID = COLLEGE_DATA.COLLEGEID
SET STUDENT_TABLE.PRIVACY = "PRIVATE"

我试过的逻辑:

df_STUDENT_TABLE = (
    df_STUDENT_TABLE.alias('a')
    .join(
        df_COLLEGE_DATA('b'),
        on=F.col('a.PROFESSIONALID') == F.col('b.UNIQUEID'),
        how='left',
    )
    .join(
        df_DEPARTMENT_DATA.alias('c'),
        on=F.col('b.COLLEGEID') == F.col('c.PUBLICID'),
        how='left',
    )
    .select(
        *[F.col(f'a.{c}') for c in df_STUDENT_TABLE.columns],
        F.when(
            F.col('b.UNIQUEID').isNotNull() & F.col('c.PUBLICID').isNotNull()
            F.lit('PRIVATE')
        ).alias('PRIVACY')
    )
)

此代码正在添加一个新列“PRIVACY”,但在运行后给出空值。

【问题讨论】:

  • 您能否编辑您的问题以添加示例数据、预期输出和您得到的输出?
  • 如果我们的连接运算符的 ON 条件满足,我想要的预期输出我们必须更新数据框中的列 PRIVACY,其值为“PRIVATE”。
  • PRIVACY Column 已经存在于我们的 Dataframe 中并且当前为 Null,我们正在通过应用连接条件将值加载到 Column 中

标签: apache-spark pyspark sql-update inner-join azure-databricks


【解决方案1】:
  • 我已经获取了一些样本数据,当我使用条件应用连接时,我得到的结果如下(要求是以下记录的隐私需要设置为PRIVATE)
%sql

select student.*,college.*,department.* from department INNER JOIN college INNER JOIN student
ON college.unique_id = student.professional_id and department.public_id = college.college_id


  • 当我使用您的代码(相同的逻辑)时,我得到了相同的输出,即在数据框中添加了一个具有所需值的附加列,而实际的 privacy 列具有空值。
from pyspark.sql.functions import col,when,lit

df_s = df_s.alias('a').join(df_c.alias('b'), col('a.professional_id') == col('b.unique_id'),'left').join(df_d.alias('c'), col('b.college_id') == col('c.public_id'),'left').select(*[col(f'a.{c}') for c in df_s.columns],when(col('b.unique_id').isNotNull() & col('c.public_id').isNotNull(), 'PRIVATE').otherwise(col('a.privacy')).alias('req_value'))
df_s.show()


  • 由于从上面看,req_value是需要值的列,这些值需要反映在privacy中,你可以直接使用下面的代码。
final = df_s.withColumn('privacy',col('req_value')).select([column for column in df_s.columns if column!='req_value'])
final.show()

更新:

您还可以使用以下代码,其中我使用 withColumn 而不是 select 更新了列。

df_s = df_s.alias('a').join(df_c.alias('b'), col('a.professional_id') == col('b.unique_id'),'left').join(df_d.alias('c'), col('b.college_id') == col('c.public_id'),'left').withColumn('privacy',when(col('b.unique_id').isNotNull() & col('c.public_id').isNotNull(), 'PRIVATE').otherwise(col('privacy'))).select(*df_s.columns)

#or you can use this as well, without using alias.
#df_s = df_s.join(df_c, df_s['professional_id'] == df_c['unique_id'],'left').join(df_d, df_c['college_id'] == df_d['public_id'],'left').withColumn('privacy',when(df_c['unique_id'].isNotNull() & df_d['public_id'].isNotNull(), 'PRIVATE').otherwise(df_s['privacy'])).select(*df_s.columns)


df_s.show()

【讨论】:

  • 为什么我们使用这个 Req_value 列,我们希望隐私列中的数据
  • 当我们在代码中使用 alias('PRIVACY') 时(我使用了 alias('req_column')),它会创建一个新列,但不会更新 privacy 列。并且访问该列也是模棱两可的(因为两个列具有相同的名称)。 req_column 中的值是隐私列实际需要的更新值。所以,我所做的是我将req_column 值分配给privacy 列,并且最初只选择了存在 int df_student 数据框的列。
  • 我已经更新了我直接使用withColumn() 更新隐私列的答案。请检查。
【解决方案2】:

加入后,您可以使用nvl2。它可以检查与最后一个数据帧(df_dept)的连接是否成功,如果是,则可以返回“PRIVATE”,否则返回来自df_stud.PRIVACY的值。

输入:

from pyspark.sql import functions as F
df_stud = spark.createDataFrame([(1, 'x'), (2, 'STAY')], ['PROFESSIONALID', 'PRIVACY'])
df_college = spark.createDataFrame([(1, 1)], ['COLLEGEID', 'UNIQUEID'])
df_dept = spark.createDataFrame([(1,)], ['PUBLICID'])

df_stud.show()
# +--------------+-------+
# |PROFESSIONALID|PRIVACY|
# +--------------+-------+
# |             1|      x|
# |             2|   STAY|
# +--------------+-------+

脚本:

df = (df_stud.alias('s')
    .join(df_college.alias('c'), F.col('s.PROFESSIONALID') == F.col('c.UNIQUEID'), 'left')
    .join(df_dept.alias('d'), F.col('c.COLLEGEID') == F.col('d.PUBLICID'), 'left')
    .select(
        *[f's.`{c}`' for c in df_stud.columns if c != 'PRIVACY'],
        F.expr("nvl2(d.PUBLICID, 'PRIVATE', s.PRIVACY) PRIVACY")
    )
)
df.show()
# +--------------+-------+
# |PROFESSIONALID|PRIVACY|
# +--------------+-------+
# |             1|PRIVATE|
# |             2|   STAY|
# +--------------+-------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-06-28
    • 2013-06-23
    • 1970-01-01
    • 2013-05-22
    • 1970-01-01
    • 2014-01-21
    • 1970-01-01
    相关资源
    最近更新 更多