【问题标题】:pyspark: filtering and extract struct through ArrayType columnpyspark:通过 ArrayType 列过滤和提取结构
【发布时间】:2021-06-09 03:18:50
【问题描述】:

我正在使用 pyspark 2.2 并具有以下架构

root
 |-- col1: string (nullable = true)
 |-- col2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)

和数据

+----+----------------------------------------------+
|col1|col2                                          |
+----+----------------------------------------------+
|A   |[[id1, [k -> v1]], [id2, [k2 -> v5, k -> v2]]]|
|B   |[[id3, [k -> v3]], [id4, [k3 -> v6, k -> v4]]]|
+----+----------------------------------------------+

col2 是一个复杂的结构。它是一个结构数组,每个结构都有两个元素,一个id 字符串和一个metadata 映射。 (这是一个简化的数据集,真实的数据集在 struct 中有 10+ 个元素,在 metadata 字段中有 10+ 个键值对)。

我想形成一个查询,该查询返回一个与我的过滤逻辑匹配的数据帧(比如 col1 == 'A'col2.id == 'id2'col2.metadata.k == 'v2')。

结果看起来像这样,过滤逻辑可以匹配数组中最多一个结构,所以在第二列中它只是一个结构而不是一个结构的数组

+----+--------------------------+
|col1|col2_filtered             |
+----+--------------------------+
|A   |[id2, [k2 -> v5, k -> v2]]|
+----+--------------------------+

我知道如何通过explode 实现这一点,但问题是col2 通常有超过 100 多个结构,并且最多会有一个与我的过滤逻辑匹配,所以我不认为explode 是可扩展的解决方案。

谁能告诉我怎么做,提前谢谢!

下面是设置的代码块。

from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType

schema = StructType([
    StructField('col1', StringType(), True),
    StructField('col2', ArrayType(
        StructType([
            StructField('id', StringType(), True),
            StructField('metadata', MapType(StringType(), StringType()), True)
        ])
    ))
])

data = [
    ('A', [('id1', {'k': 'v1'}), ('id2', {'k': 'v2', 'k2': 'v5'})]),
    ('B', [('id3', {'k': 'v3'}), ('id4', {'k': 'v4', 'k3': 'v6'})])
]

df = spark.createDataFrame(data=data, schema=schema)

【问题讨论】:

    标签: python json apache-spark pyspark apache-spark-sql


    【解决方案1】:

    除了@mck的解决方案,我在搜索后又尝试了三种方法,都得到了想要的结果。

    1. 使用udf过滤并返回匹配的结构体
    df.filter(df.col1 == 'A') \
      .select(df.col1, udf(lambda a: [s for s in a if s.id == 'id2' and s.metadata['k'] == 'v2'], df.schema['col2'].dataType)('col2')[0].alias('col2_filtered')) \
      .na.drop('any')
    
    1. 使用udf过滤并获取匹配结构的索引
    df.filter(df.col1 == 'A') \
      .select(df.col1, df.col2.getItem(udf(lambda a: [i for i, s in enumerate(a) if s.id == 'id2' and s.metadata['k'] == 'v2'], ArrayType(IntegerType(), True))(df.col2)[0]).alias('col2_filtered')) \
      .na.drop('any')
    
    1. 使用 expr 进行过滤,这是 Spark 2.4 中的一项功能,因此可以作为未来升级的候选者
    df.filter(df.col1 == 'A') \
      .select(df.col1, expr("filter(col2, s -> s.id == 'id2' AND s.metadata['k'] == 'v2')").getItem(0).alias('col2_filtered')) \
      .na.drop('any')
    

    【讨论】:

      【解决方案2】:

      编辑:您可以尝试 UDF:

      import pyspark.sql.functions as F
      
      df2 = df.filter(
          F.udf(lambda x: any([y.id == 'id2' and 'k' in y.metadata.keys() for y in x]), 'boolean')('col2')
      ).withColumn(
          'col2',
          F.udf(lambda x: [y for y in x if y.id == 'id2' and 'k' in y.metadata.keys()][0], 'struct<id:string,metadata:map<string,string>>')('col2')
      )
      
      df2.show(truncate=False)
      +----+--------------------------+
      |col1|col2                      |
      +----+--------------------------+
      |A   |[id2, [k2 -> v5, k -> v2]]|
      +----+--------------------------+
      

      您可以将列转换为 JSON 并检查 col2 是否包含所需的 JSON:

      import pyspark.sql.functions as F
      
      df2 = df.filter(
          (F.col('col1') == 'A') &
          F.to_json('col2').contains(
              F.to_json(
                  F.struct(
                      F.lit('id2').alias('id'),
                      F.create_map(F.lit('k'), F.lit('v2')).alias('metadata')
                  )
              )
          )
      )
      
      df2.show(truncate=False)
      +----+------------------------------------+
      |col1|col2                                |
      +----+------------------------------------+
      |A   |[[id1, [k -> v1]], [id2, [k -> v2]]]|
      +----+------------------------------------+
      

      如果您只想将匹配的结构保留在 col2 中,可以使用 withColumn 替换它:

      df3 = df2.withColumn(
          'col2', 
          F.struct(
              F.lit('id2').alias('id'),
              F.create_map(F.lit('k'), F.lit('v2')).alias('metadata')
          )
      )
      
      df3.show()
      +----+----------------+
      |col1|            col2|
      +----+----------------+
      |   A|[id2, [k -> v2]]|
      +----+----------------+
      

      【讨论】:

      • 感谢@mcd 的快速响应。事实上,这篇文章的数据集是一个简化版本,真正的数据集在结构中有超过 10 多个元素,在元数据映射中有 10 多个键值对。第一个解决方案可以通过array_contains 实现,我相信但这不是我想要的,我想要唯一一个与我的过滤逻辑匹配的结构,而不是包含匹配逻辑的数组。对于第二种解决方案,我不可能 construct 具有匹配值的结构,因为我对结构中的其他值视而不见。您能否修改您的答案以反映这一点。谢谢
      • 不确定您是否完全理解我的答案,但第二种解决方案取决于第一种解决方案。 df2 变量来自第一部分。这有帮助吗?
      • 我得到了你的答案,我担心当我在metadata 中有额外的键值对时它会失败。我编辑了我的帖子以反映这些变化。
      • 这有点难。 Spark 2.4 中的 tbh filter 应该更适合此类任务。对于 Spark 2.2,您可能需要在我的回答中依赖这种技巧,或者如果您不介意性能受到一点影响,请使用 UDF。
      • @FrancisYL 我添加了一个 UDF 解决方案。看看有没有用。
      猜你喜欢
      • 2021-12-24
      • 2021-06-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-06-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多