【问题标题】:Merge and replace elements of two dataframes using PySpark使用 PySpark 合并和替换两个数据框的元素
【发布时间】:2020-07-31 03:17:09
【问题描述】:

我有两个数据框:

DF1:

╔═══════╦═════╦═════════╦════╗
║ Name  ║ Age ║ Address ║ Id ║
╠═══════╬═════╬═════════╬════╣
║ test1 ║ 20  ║  ls     ║ 10 ║
╠═══════╬═════╬═════════╬════╣
║ test2 ║     ║ baz     ║ 15 ║
╠═══════╬═════╬═════════╬════╣
║ test3 ║     ║ az      ║ 19 ║
╚═══════╩═════╩═════════╩════╝

DF2:

╔═══════╦═════╦═════════╦════╗
║ Name  ║ Age ║ Address ║ Id ║
╠═══════╬═════╬═════════╬════╣
║ test4 ║ 20  ║ bas     ║ 10 ║
╠═══════╬═════╬═════════╬════╣
║ test5 ║     ║ baz     ║ 25 ║
╠═══════╬═════╬═════════╬════╣
║ test6 ║ 40  ║ az      ║ 19 ║
╚═══════╩═════╩═════════╩════╝

结果:

╔═══════╦═════╦═════════╦════╗
║ Name  ║ Age ║ Address ║ Id ║
╠═══════╬═════╬═════════╬════╣
║ test1 ║ 20  ║ ls      ║ 10 ║
╠═══════╬═════╬═════════╬════╣
║ test2 ║ 40  ║  az     ║ 19 ║
╚═══════╩═════╩═════════╩════╝

我想要达到的目标: 1.当 Id 在两个帧中都匹配时,它应该只考虑输出中的那个记录。 2. 该匹配记录的所有列应替换为 DF1 列。 3. 如果 DF1 列为空且 DF2 中存在数据,则不应替换它。

也尝试过加入:

DF3 = DF1.join(DF2, [DF1.Id == DF2.Id], 'inner')
DF3.show()

结果:

Name,Age,Adress,Id,Name,Age,Adress,Id
test1,20,ls,10,test5,20,bas,10

如果我使用

DF3 = DF1.join(DF2, [DF1.Id == DF2.Id], 'leftsemi')
DF3.show()

它为我提供来自 DF1 的数据,并且不添加来自 D2 的缺失值。

尝试实现以下目标:

for i in df2.columns:
    df2 = df2.withColumn(i, when(df1.Id == col("Id") & (col(i) == ""), df1(i)).otherwise(col(i)))
df2.show()

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    外部联接将保留两个表中的记录以及相应的左/右表中的关联空值。可以比较左右数据(检查是否为空)并替换为非空值。

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import when
    
    spark = SparkSession.builder.getOrCreate()
    
    ds1 = [
        {'Name': 'test1', 'Age': 20, 'Address': 'ls', 'Id': 10},
        {'Name': 'test2', 'Age': None, 'Address': 'baz', 'Id': 15},
        {'Name': 'test3', 'Age': None, 'Address': 'az', 'Id': 19},
    ]
    
    ds2 = [
        {'Name': 'test4', 'Age': 20, 'Address': 'az', 'Id': 10},
        {'Name': 'test5', 'Age': None, 'Address': 'az', 'Id': 25},
        {'Name': 'test6', 'Age': 40, 'Address': 'az', 'Id': 19},
    ]
    
    df1 = spark.createDataFrame(ds1)
    df2 = spark.createDataFrame(ds2)
    
    df1.show()
    
    +-------+----+---+-----+
    |Address| Age| Id| Name|
    +-------+----+---+-----+
    |     ls|  20| 10|test1|
    |    baz|null| 15|test2|
    |     az|null| 19|test3|
    +-------+----+---+-----+
    
    df2.show()
    
    +-------+----+---+-----+
    |Address| Age| Id| Name|
    +-------+----+---+-----+
    |     az|  20| 10|test4|
    |     az|null| 25|test5|
    |     az|  40| 19|test6|
    +-------+----+---+-----+
    
    join_by_col = 'Id'
    
    df_ = df1.join(df2, on=[join_by_col], how='outer').orderBy(join_by_col)
    
    df_.show()
    
    +---+-------+----+-----+-------+----+-----+
    | Id|Address| Age| Name|Address| Age| Name|
    +---+-------+----+-----+-------+----+-----+
    | 10|     ls|  20|test1|     az|  20|test4|
    | 15|    baz|null|test2|   null|null| null|
    | 19|     az|null|test3|     az|  40|test6|
    | 25|   null|null| null|     az|null|test5|
    +---+-------+----+-----+-------+----+-----+
    
    for col in df1.columns:
        if col != join_by_col:
            col_ = col + '_'
            df_ = df_.withColumn(
                col_, when(df1[col].isNull(), df2[col]).otherwise(df1[col])).drop(col)
    
    df_.show()
    
    +---+--------+----+-----+
    | Id|Address_|Age_|Name_|
    +---+--------+----+-----+
    | 10|      ls|  20|test1|
    | 15|     baz|null|test2|
    | 19|      az|  40|test3|
    | 25|      az|null|test5|
    +---+--------+----+-----+
    

    【讨论】:

    • 感谢您的帮助,它不符合我的要求列应该是唯一的 + 当记录数不必要地增加时,内存中将加载标题数
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-11-17
    • 1970-01-01
    • 1970-01-01
    • 2019-04-28
    • 1970-01-01
    • 2018-12-25
    • 2016-03-01
    相关资源
    最近更新 更多