【问题标题】:spark filter (delete) rows based on values from another dataframe [duplicate]基于来自另一个数据帧的值的火花过滤器(删除)行[重复]
【发布时间】:2017-04-19 14:00:37
【问题描述】:

我有一个超过 20 列的“大”数据集 (huge_df)。其中一列是id 字段(使用pyspark.sql.functions.monotonically_increasing_id() 生成)。

使用某些标准,我生成了第二个数据框 (filter_df),其中包含我想稍后从 huge_df 过滤的 id 值。

目前我正在使用 SQL 语法来执行此操作:

filter_df.createOrReplaceTempView('filter_view')
huge_df = huge_df.where('id NOT IN (SELECT id FROM filter_view)')

问题 1: 有没有办法只使用 Python 来做到这一点,即无需注册 TempView

问题 2: 有没有完全不同的方法来完成同样的事情?

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    你可以使用JOIN

    huge_df = huge_df.join(filter_df, huge_df.id == filter_df.id, "left_outer")
                     .where(filter_df.id.isNull())
                     .select([col(c) for c in huge_df.columns]
    

    但它会导致昂贵的洗牌。

    逻辑很简单:在 id 字段上使用 filter_df 进行左连接并检查 filter_df 是否为 null - 如果为 null,则表示 filter_df 中没有这样的行

    【讨论】:

    • 你可以说.select([col(c) for c in huge_df.columns]而不是.select(huge_df.columns),不是吗?
    • 谢谢!这有很大帮助。我试图避免join,但事实证明这比id NOT IN (SELECT ...)效率更高
    【解决方案2】:

    这是另一种方法-

    # Sample data
    hugedf = spark.createDataFrame([[1,'a'],[2,'b'],[3,'c'],[4,'d']],schema=(['k1','v1']))
    fildf = spark.createDataFrame([[1,'b'],[3,'c']],schema=(['k2','v2']))
    
    
    from pyspark.sql.functions import col
    hugedf\
    .select('k1')\     
    .subtract(fildf.select('k2'))\ 
    .toDF('d1')\
    .join(hugedf,col('d1')==hugedf.k1)\
    .drop('d1')\
    .show()
    

    逻辑很简单,从hugeDF中找到的id值中减去filteredDf中找到的id值,留下filterDF中没有的id值,

    为了清楚起见,我将减去的值标记为列 'd1',然后在 d1 值上加入 hugeDF 表并删除 d1 以给出最终结果。

    【讨论】:

      猜你喜欢
      • 2023-03-29
      • 2015-12-16
      • 1970-01-01
      • 1970-01-01
      • 2018-02-12
      • 1970-01-01
      • 2018-06-07
      • 2018-06-25
      • 2016-06-23
      相关资源
      最近更新 更多