【问题标题】:How to compare records from PySpark data frames如何比较来自 PySpark 数据帧的记录
【发布时间】:2019-02-12 05:30:20
【问题描述】:

我想比较 2 个数据框,我想根据以下 3 个条件提取记录。

  1. 如果记录匹配,则“SAME”应出现在新列 FLAG 中。
  2. 如果记录不匹配,如果它来自df1(假设No.66),'DF1'应该在FLAG列中。
  3. 如果记录不匹配,如果它来自df2(假设No.77),'DF2'应该进入FLAG列。 这里需要考虑和验证整个 RECORD。记录明智的比较。
    我还需要使用 PySpark 代码检查数百万条记录。

df1:

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,IT,2/11/2019
22,Tom,2000,usa,HR,2/11/2019
33,Kom,3500,uk,IT,2/11/2019
44,Nom,4000,can,HR,2/11/2019
55,Vom,5000,mex,IT,2/11/2019
66,XYZ,5000,mex,IT,2/11/2019

df2:

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,IT,2/11/2019
22,Tom,2000,usa,HR,2/11/2019
33,Kom,3000,uk,IT,2/11/2019
44,Nom,4000,can,HR,2/11/2019
55,Xom,5000,mex,IT,2/11/2019
77,XYZ,5000,mex,IT,2/11/2019

预期输出:

No,Name,Sal,Address,Dept,Join_Date,FLAG
11,Sam,1000,ind,IT,2/11/2019,SAME
22,Tom,2000,usa,HR,2/11/2019,SAME
33,Kom,3500,uk,IT,2/11/2019,DF1
33,Kom,3000,uk,IT,2/11/2019,DF2
44,Nom,4000,can,HR,2/11/2019,SAME
55,Vom,5000,mex,IT,2/11/2019,DF1
55,Xom,5000,mex,IT,2/11/2019,DF2
66,XYZ,5000,mex,IT,2/11/2019,DF1
77,XYZ,5000,mex,IT,2/11/2019,DF2

我加载了如下所示的输入数据,但不知道如何继续。

df1 = pd.read_csv("D:\\inputs\\file1.csv")

df2 = pd.read_csv("D:\\inputs\\file2.csv")

感谢任何帮助。谢谢。

【问题讨论】:

    标签: python-3.x pyspark apache-spark-sql


    【解决方案1】:
    # Requisite packages to import
    import sys
    from pyspark.sql.functions import lit, count, col, when
    from pyspark.sql.window import Window
    
    # Create the two dataframes
    df1 = sqlContext.createDataFrame([(11,'Sam',1000,'ind','IT','2/11/2019'),(22,'Tom',2000,'usa','HR','2/11/2019'),
                                     (33,'Kom',3500,'uk','IT','2/11/2019'),(44,'Nom',4000,'can','HR','2/11/2019'),
                                     (55,'Vom',5000,'mex','IT','2/11/2019'),(66,'XYZ',5000,'mex','IT','2/11/2019')],
                                     ['No','Name','Sal','Address','Dept','Join_Date']) 
    df2 = sqlContext.createDataFrame([(11,'Sam',1000,'ind','IT','2/11/2019'),(22,'Tom',2000,'usa','HR','2/11/2019'),
                                      (33,'Kom',3000,'uk','IT','2/11/2019'),(44,'Nom',4000,'can','HR','2/11/2019'),
                                      (55,'Xom',5000,'mex','IT','2/11/2019'),(77,'XYZ',5000,'mex','IT','2/11/2019')],
                                      ['No','Name','Sal','Address','Dept','Join_Date']) 
    df1 = df1.withColumn('FLAG',lit('DF1'))
    df2 = df2.withColumn('FLAG',lit('DF2'))
    
    # Concatenate the two DataFrames, to create one big dataframe.
    df = df1.union(df2)
    

    使用window函数检查相同行数是否大于1,如果确实是,则将FLAG列标记为SAME,否则保持原样。最后,删除重复项。

    my_window = Window.partitionBy('No','Name','Sal','Address','Dept','Join_Date').rowsBetween(-sys.maxsize, sys.maxsize)
    df = df.withColumn('FLAG', when((count('*').over(my_window) > 1),'SAME').otherwise(col('FLAG'))).dropDuplicates()
    df.show()
    +---+----+----+-------+----+---------+----+
    | No|Name| Sal|Address|Dept|Join_Date|FLAG|
    +---+----+----+-------+----+---------+----+
    | 33| Kom|3000|     uk|  IT|2/11/2019| DF2|
    | 44| Nom|4000|    can|  HR|2/11/2019|SAME|
    | 22| Tom|2000|    usa|  HR|2/11/2019|SAME|
    | 77| XYZ|5000|    mex|  IT|2/11/2019| DF2|
    | 55| Xom|5000|    mex|  IT|2/11/2019| DF2|
    | 11| Sam|1000|    ind|  IT|2/11/2019|SAME|
    | 66| XYZ|5000|    mex|  IT|2/11/2019| DF1|
    | 55| Vom|5000|    mex|  IT|2/11/2019| DF1|
    | 33| Kom|3500|     uk|  IT|2/11/2019| DF1|
    +---+----+----+-------+----+---------+----+
    

    【讨论】:

    • 现在,这个解决方案非常适合我。早些时候我不了解窗口功能。即使对于数百万条记录,这也非常出色。另一个解决方案也很好,但在处理大量数据时,发现的问题很少。非常感谢。
    • @cph_sto,您对 50 多列有什么建议?而不是输入列的名称?
    • 对于 50 多列,您会推荐什么?而不是输入列的名称? @RK
    • 什么是 sys.maxsize?
    • python 中可能的最大数量
    【解决方案2】:

    我认为您可以通过创建临时列来指示来源和join 来解决您的问题。然后您只需检查条件,即是否存在两个来源,或者是否只有一个来源以及哪个来源。

    考虑以下代码:

    from pyspark.sql.functions import *
    
    
    df1= sqlContext.createDataFrame([(11,'Sam',1000,'ind','IT','2/11/2019'),\
    (22,'Tom',2000,'usa','HR','2/11/2019'),(33,'Kom',3500,'uk','IT','2/11/2019'),\
    (44,'Nom',4000,'can','HR','2/11/2019'),(55,'Vom',5000,'mex','IT','2/11/2019'),\
    (66,'XYZ',5000,'mex','IT','2/11/2019')], \
    ["No","Name","Sal","Address","Dept","Join_Date"])
    
    df2= sqlContext.createDataFrame([(11,'Sam',1000,'ind','IT','2/11/2019'),\
    (22,'Tom',2000,'usa','HR','2/11/2019'),(33,'Kom',3000,'uk','IT','2/11/2019'),\
    (44,'Nom',4000,'can','HR','2/11/2019'),(55,'Xom',5000,'mex','IT','2/11/2019'),\
    (77,'XYZ',5000,'mex','IT','2/11/2019')], \
    ["No","Name","Sal","Address","Dept","Join_Date"])
    #creation of your example dataframes
    
    df1 = df1.withColumn("Source1", lit("DF1"))
    df2 = df2.withColumn("Source2", lit("DF2"))
    #temporary columns to refer the origin later
    
    df1.join(df2, ["No","Name","Sal","Address","Dept","Join_Date"],"full")\
    #full join on all columns, but source is only set if record appears in original dataframe\
    .withColumn("FLAG",when(col("Source1").isNotNull() & col("Source2").isNotNull(), "SAME")\
    #condition if record appears in both dataframes\
    .otherwise(when(col("Source1").isNotNull(), "DF1").otherwise("DF2")))\
    #condition if record appears in one dataframe\
    .drop("Source1","Source2").show() #remove temporary columns and show result
    

    输出:

    +---+----+----+-------+----+---------+----+
    | No|Name| Sal|Address|Dept|Join_Date|FLAG|
    +---+----+----+-------+----+---------+----+
    | 33| Kom|3000|     uk|  IT|2/11/2019| DF2|
    | 44| Nom|4000|    can|  HR|2/11/2019|SAME|
    | 22| Tom|2000|    usa|  HR|2/11/2019|SAME|
    | 77| XYZ|5000|    mex|  IT|2/11/2019| DF2|
    | 55| Xom|5000|    mex|  IT|2/11/2019| DF2|
    | 11| Sam|1000|    ind|  IT|2/11/2019|SAME|
    | 66| XYZ|5000|    mex|  IT|2/11/2019| DF1|
    | 55| Vom|5000|    mex|  IT|2/11/2019| DF1|
    | 33| Kom|3500|     uk|  IT|2/11/2019| DF1|
    +---+----+----+-------+----+---------+----+
    

    【讨论】:

    • 您是否包含了导入? from pyspark.sql.functions import * 那么通常你应该能够在列上调用 isNotNull()
    • 也可以用.withColumn("FLAG",when((col("Source1")=="DF1") & (col("Source2") == "DF2"), "SAME").otherwise(when(col("Source1") == "DF1", "DF1").otherwise("DF2")))替换条件,现在不检查null而是检查原点
    • 您可能不需要 sqlContext,因为您在读取文件时创建了数据框。我只用它来创建您的示例作为数据框。
    • 抱歉打发你的时间:这次得到:AttributeError:'str'对象没有属性'equalTo'。现在使用以下语句: df1.join(df2, ["No","Name","Sal","Address","Dept","Join_Date"],"full").withColumn("FLAG",when( (col("Source1")=="DF1") & (col("Source2") == "DF2"), "SAME").otherwise(when(col("Source1").isNotNull(), "DF1 ").otherwise("DF2"))).drop("Source1","Source2").show()
    • 奇怪,如果我只是复制这段代码,我可以执行它并得到想要的结果。我的猜测是 col 函数无法正常工作,因为它应该返回特定的列而不是字符串。如果导入正确,可能类似于from pyspark.sql.functions import lit, col, when,那么它似乎取决于您的运行时环境。您可以尝试将列引用为df1.Source1 等,但随后您必须将连接结果分配给变量
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-06-08
    • 2020-01-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多