【问题标题】:Check for Duplicates exists in a dataframe compared to another table using Pyspark [duplicate]与使用 Pyspark 的另一个表相比,检查数据框中是否存在重复项 [重复]
【发布时间】:2021-01-21 19:29:44
【问题描述】:

我在 Hive 中有一张桌子,如下所示。

spark.sql("select * from custom_log where custm_id = '9012457WXR'").show(100,False)
+----------+--------------------------+---------------------------------------------------------------------------------+----------+
|custm_id  |transf_date               |transfr                                                                          |pdt       |
+----------+--------------------------+---------------------------------------------------------------------------------+----------+
|9012457WXR|2020-12-14 20:24:08.213000|[{'oldstatus': ['new'], 'newValues': ['addition Done']}]                         |20201214  |
|9012457WXR|2020-12-14 20:24:09.175000|[{'oldstatus': ['addition Done'], 'newValues': ['update pref']}]                 |20201214  |
|9012457WXR|2020-12-14 20:24:09.241000|[{'oldstatus': ['update pref'], 'newValues': ['update personal pref']}]          |20201214  |
|9012457WXR|2020-12-14 20:24:09.241000|[{'oldstatus': ['update personal pref'], 'newValues': ['Acct settings']}]        |20201214  |
|9012457WXR|2020-12-14 23:06:23.197000|[{'oldstatus': ['Acct settings'], 'newValues': ['update pref']}]                 |20201214  |
|9012457WXR|2020-12-15 00:03:05.496000|[{'oldstatus': ['update pref'], 'newValues': ['Notificatios mod']}]              |20201215  |
|9012457WXR|2020-12-15 00:03:05.568000|[{'oldstatus': ['Notificatios mod'], 'newValues': ['shop']}]                     |20201215  |
|9012457WXR|2020-12-15 00:03:05.568000|[{'oldstatus': ['shop'], 'newValues': ['Fav']}]                                  |20201215  |
+----------+--------------------------+---------------------------------------------------------------------------------+----------+

现在我有一个具有相同字段的数据框,如果数据框中的不同 custm_id、trans_date、transfr 存在于表 custom_log 中,我需要过滤掉行。下面是数据框

+----------+--------------------------+-------------------------------------------------------------------+----------+
|custm_id  |transf_date               |transfr                                                            |pdt       |
+----------+--------------------------+-------------------------------------------------------------------+----------+
|9012457WXR|2020-12-14 20:24:08.213000|[{'oldstatus': ['Acct settings'], 'newValues': ['update pref']}]   |20201216  |
|9012457WXR|2020-12-16 08:24:18.175000|[{'oldstatus': ['Fav'], 'newValues': ['Wishlist']}]                |20201216  |
|9012457WXR|2020-12-16 08:28:21.241000|[{'oldstatus': ['Wishlist'], 'newValues': ['Rm Wishlist']}]        |20201216  |
|9012457WXR|2020-12-16 11:13:46.241000|[{'oldstatus': ['Rm Wishlist'], 'newValues': ['Shop]}]             |20201216  |
|9012457WXR|2020-12-16 19:06:12.197000|[{'oldstatus': ['Shop'], 'newValues': ['Fav']}]                    |20201216  |
|9012457WXR|2020-12-16 20:03:18.496000|[{'oldstatus': ['Fav'], 'newValues': ['brk']}]                     |20201216  |
+----------+--------------------------+-------------------------------------------------------------------+----------+

在上面的数据框中,第一行与表的第五行相同,除了 pdt。所以我需要比较表和数据帧中不同的 custm_id、transf_date、transfr 并从数据帧中过滤掉这些行。

如何在 Pyspark 中实现这一点?

【问题讨论】:

    标签: apache-spark pyspark hive apache-spark-sql


    【解决方案1】:

    您可以尝试使用anti 连接来删除满足给定条件的行:

    import pyspark.sql.functions as F
    
    result = df2.alias('df2').join(df1.alias('df1'),
        F.expr("""
            df2.custm_id = df1.custm_id and
            df2.transf_date = df1.transf_date and
            df2.transfr = df1.transfr and
            df2.pdt != df1.pdt
        """),
        'anti'
    )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-09-09
      • 2013-05-11
      • 2012-06-26
      • 2015-04-18
      • 2017-04-20
      • 2020-12-31
      • 2011-02-25
      • 2017-04-05
      相关资源
      最近更新 更多