【问题标题】:Compare two DataFrames and check for changes比较两个 DataFrame 并检查更改
【发布时间】:2021-01-15 20:02:12
【问题描述】:

我有 2 个相似的 Spark Dataframes df1df2 我想比较更改:

  • df1df2 共享相同的列
  • df2 的行数可以多于 df1,但在比较时可以忽略 df2 中不在 df1 中的任何其他行
  • 比较键列是PROGRAM_NAMEACTION
df1 = spark.createDataFrame([
              ["PROG1","ACTION1","10","NEW"],
              ["PROG2","ACTION2","12","NEW"],
              ["PROG3","ACTION1","14","NEW"],
              ["PROG4","ACTION4","16","NEW"]
    ],["PROGRAM_NAME", "ACTION", "VALUE1", "STATUS"])

df2 = spark.createDataFrame([
          ["PROG1","ACTION1","11","IN PROGRESS"],
          ["PROG2","ACTION2","12","NEW"],
          ["PROG3","ACTION1","20","FINISHED"],
          ["PROG4","ACTION4","14","IN PROGRESS"],
          ["PROG5","ACTION1","20","NEW"]
],["PROGRAM_NAME", "ACTION", "VALUE1", "STATUS"])

df1df2 和比较两个数据帧后我想要的预期结果显示如下。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql pyspark-dataframes


    【解决方案1】:

    SO 已多次提出类似问题。

    使用简单的join 获取位于df1df2 中的行,并过滤其他两列具有不同值的行:

    from pyspark.sql.functions import col
    
    df_final = df2.alias("new").join(
            df1.alias("old"),
            (col("new.PROGRAM_NAME") == col("old.PROGRAM_NAME")) & (col("new.ACTION") == col("old.ACTION"))
        ).filter(
            (col("new.VALUE1") != col("old.VALUE1")) | (col("new.STATUS") != col("old.STATUS"))
        ).select("new.*")
    
    df_final.show()
    
    #+------------+-------+------+-----------+
    #|PROGRAM_NAME| ACTION|VALUE1|     STATUS|
    #+------------+-------+------+-----------+
    #|       PROG3|ACTION1|    20|   FINISHED|
    #|       PROG4|ACTION4|    14|IN PROGRESS|
    #|       PROG1|ACTION1|    11|IN PROGRESS|
    #+------------+-------+------+-----------+
    

    也可以直接将过滤条件加到join条件中

    【讨论】:

      【解决方案2】:

      你可以得到这样的结果:

      import pandas as pd
      
      dict1 = {"PROGRAM_NAME":["PROG1","PROG2","PROG3","PROG4"],
               "ACTION":["ACTION1","ACTION2","ACTION1","ACTION4"],
               "Value1":[10,12,14,16],
               "Status":["NEW","NEW","NEW","NEW"]} 
      dict2 = {"PROGRAM_NAME":["PROG1","PROG2","PROG3","PROG4","PROG5"],
               "ACTION":["ACTION1","ACTION2","ACTION1","ACTION4","ACTION1"],
               "Value1":[11,12,20,14,20],
               "Status":["IN PROGRES","NEW","FINISHED","IN PROGRES","NEW"]}
      
      DF1 = pd.DataFrame(dict1)
      DF2 = pd.DataFrame(dict2)
      
      DF3 = DF2.copy()
      DF3 = DF3[DF3["PROGRAM_NAME"].isin(DF1["PROGRAM_NAME"])]
      

      输出:

      【讨论】:

      • 为什么您不需要问题中确定的操作标准
      • 感谢您的回复。这几乎奏效了。但是如果我向 DF1 和 DF2 添加一个新行,其中所有列中的值都相同(没有变化)。它应该“不”出现在最终数据框中,因为没有数据被更改。如果可能的话,您能否以 Spark 格式发送回复
      【解决方案3】:

      您可以合并 df1 和 df2 并仅保留 df2 的 VALUE1STATUS

      df1

      df2

      将df1的列后缀保留为_x,df2的列保留为空白,然后仅保留df2的列

      df1.merge(df2, on=['PROGRAM_NAME', 'ACTION'], suffixes=('_x', ''))[df2.columns] 
      

      【讨论】:

        【解决方案4】:

        这是 Spark 中的解决方案:

        import pyspark.sql.types as T
        import pyspark.sql.functions as F
        
        df1 = spark.createDataFrame(
            [
                ('PROG1', 'ACTION1', 10, 'NEW'),
                ('PROG2', 'ACTION2', 12, 'NEW'),
                ('PROG3', 'ACTION1', 14, 'NEW'),
                ('PROG4', 'ACTION4', 16, 'NEW'),
            ],
            ['PROGRAM_NAME', 'ACTION', 'Value1', 'Status']
        )
        
        df2 = spark.createDataFrame(
            [
                ('PROG1', 'ACTION1', 11, 'IN PROGRESS'),
                ('PROG2', 'ACTION2', 12, 'NEW'),
                ('PROG3', 'ACTION1', 20, 'FINISHED'),
                ('PROG4', 'ACTION4', 14, 'IN PROGRESS'),
                ('PROG5', 'ACTION1', 20, 'NEW'),
            ],
            ['PROGRAM_NAME', 'ACTION', 'Value1', 'Status']
        )
        
        df1 = df1.alias('df1')
        df2 = df2.alias('df2')
        df = df1.join(df2, on=['PROGRAM_NAME', 'ACTION'], how='inner')
        df = df.filter(F.col('df1.Status') != F.col('df2.Status'))
        df.select(
          F.col('PROGRAM_NAME'),
          F.col('ACTION'),
          *[F.col(f'df2.{col}') for col in df2.columns[2:]]
        )
        

        【讨论】:

        • 感谢您的回复。这几乎奏效了。但是如果我向 DF1 和 DF2 添加一个新行,其中所有列中的值都相同(没有变化)。它应该“不”出现在最终数据框中,因为没有数据被更改。如果可能的话,您能否以 Spark 格式发送回复
        • @Avi,解决方案现在是用 Spark 编写的,并且修复了
        猜你喜欢
        • 2016-02-25
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多