【问题标题】:How to filter keys in MapType in PySpark?如何在 PySpark 中过滤 MapType 中的键?
【发布时间】:2023-03-29 08:13:01
【问题描述】:

给定如下的 DataFrame,是否可以在 PySpark 中过滤掉列 collection (MapType(StringType, StringType, True)) 的一些键,同时保持架构不变?

root
 |-- id: string (nullable = true)
 |-- collection: map (nullable = true)
 |    |-- key: string
 |    |-- value: string

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    是的,这是可能的。您应该创建 udf 负责从映射中过滤键并将其与 withColumn 转换一起使用以过滤来自 collection 字段的键。

    以下 Scala 中的示例实现:

    // Start from implementing method in Scala responsible for filtering keys from Map
    def filterKeys(collection: Map[String, String], keys: Iterable[String]): Map[String, String] =
        collection.filter{case (k,_) => !keys.exists(_ == k)}
    
    // Create Spark UDF based on above function
    val filterKeysUdf = udf((collection: Map[String, String], keys: Iterable[String]) => filterKeys(collection, keys))
    
    // Use above udf to filter keys
    val newDf = df.withColumn("collection", filterKeysUdf(df("collection"), lit(Array("k1"))))
    

    在 Python 中的实现:

    # Start from implementing method in Python responsible for filtering keys from dict
    def filterKeys(collection, keys):
        return {k:collection[k] for k in collection if k not in keys}
    
    # Create Spark UDF based on above function
    filterKeysUdf = udf(filterKeys, MapType(StringType(), StringType()))
    
    # Create array literal based on Python list
    keywords_lit = array(*[lit(k) for k in ["k1","k2"]])
    
    # Use above udf to filter keys
    newDf = df.withColumn("collection", filterKeysUdf(df.collection, keywords_lit))
    

    【讨论】:

    • 在 python 中棘手的部分是获取 MapType 作为 udf 的输出
    • 嗨@yauheni_selivonchyk,我已经在Python中添加了实现。
    【解决方案2】:

    我只是想补充一下 Piotr Kalański 所说的内容,以防您希望过滤空值。

    def filterValue(collection):
      return {k:collection[k] for k in collection if collection[k]}
    
    filterValuesUdf = F.udf(filterValue, MapType(StringType(), StringType()))
    
    newDf = source_map_df.withColumn("collection", filterValuesUdf(source_map_df.f))
    

    【讨论】:

      【解决方案3】:

      在 3.1 版中,您可以使用 map_filter 执行此操作:

      import pyspark.sql.functions as f
      
      df.withColumn("filtered_map", f.map_filter("map_col", lambda _, v: v is not None))
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-07-29
        • 1970-01-01
        • 2021-12-09
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多