【问题标题】:Filter Pyspark Dataframe with udf on entire row在整行上使用 udf 过滤 Pyspark Dataframe
【发布时间】:2019-02-02 17:45:31
【问题描述】:

有没有办法选择整行作为列输入到 Pyspark 过滤器 udf 中?

我有一个复杂的过滤函数“my_filter”,我想将它应用到整个DataFrame:

my_filter_udf = udf(lambda r: my_filter(r), BooleanType())
new_df = df.filter(my_filter_udf(col("*"))

但是

col("*")

抛出一个错误,因为这不是一个有效的操作。

我知道我可以将数据帧转换为 RDD,然后使用 RDD 的过滤器方法,但我不想将其转换为 RDD,然后再转换回数据帧。我的 DataFrame 具有复杂的嵌套类型,因此当我再次尝试将 RDD 转换为 DataFrame 时,架构推断会失败。

【问题讨论】:

    标签: pyspark apache-spark-sql user-defined-functions


    【解决方案1】:

    您应该静态写入所有列。例如:

    from pyspark.sql import functions as F
    
    # create sample df
    df = sc.parallelize([
         (1, 'b'),
         (1, 'c'),
    
     ]).toDF(["id", "category"])
    
    #simple filter function
    @F.udf(returnType=BooleanType())
    def my_filter(col1, col2):
        return (col1>0) & (col2=="b")
    
    df.filter(my_filter('id', 'category')).show()
    

    结果:

    +---+--------+
    | id|category|
    +---+--------+
    |  1|       b|
    +---+--------+
    

    如果你有这么多列并且你确定列的顺序:

    cols = df.columns
    df.filter(my_filter(*cols)).show()
    

    产生相同的输出。

    【讨论】:

    • 感谢您提供这个干净的解决方案,但是如果我们有很多列,我们就不能这样做。我正在使用具有 100 列的数据框。你能帮忙处理一下那个案子吗?
    • @pnv 您必须迭代您的架构并将它们添加到记录中
    猜你喜欢
    • 2016-11-24
    • 1970-01-01
    • 2015-10-27
    • 1970-01-01
    • 1970-01-01
    • 2021-03-04
    • 1970-01-01
    • 1970-01-01
    • 2021-08-13
    相关资源
    最近更新 更多