【问题标题】:pyspark: Is it possible to create array with missing elements in one structpyspark:是否可以在一个结构中创建缺少元素的数组
【发布时间】:2021-07-04 21:06:14
【问题描述】:

我的输入 DataFrame 架构如下所示。 d 中元素 1 和 2 的区别在于 1 具有属性 a,b,c,d 而 2 只有 a,b,c

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- c: string (nullable = true)
 |-- d: struct (nullable = true)
 |    |-- 1: struct (nullable = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)
 |    |    |-- d: double (nullable = true)
 |    |-- 2: struct (nullable = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)

我正在尝试使用以下代码分解 d 的元素

df2 = inputDF.withColumn("d1",f.explode(f.array("d.*").getField("c")))

并得到错误 pyspark.sql.utils.AnalysisException: cannot resolve 'array(d.1, d.2)' 由于数据类型不匹配:输入到函数数组应该都是相同的类型,但它是 [struct, struct]; '项目 [a#832, b#833, c#834, d#835, explode(array(d#835.1, d#835.2)[c]) AS d1#843] +- 关系[a#832,b#833,c#834,d#835] json

有没有办法指示函数在数组函数的输入中缺少列时假定为 NULLS?

【问题讨论】:

  • 你能提供一些输入数据并显示所需的输出吗?

标签: pyspark


【解决方案1】:

您可以在其中一个元素缺少字段的情况下展开结构数组,如下所示:

import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, ArrayType, StructField, StringType

spark = SparkSession \
    .builder \
    .appName("SparkTesting") \
    .getOrCreate()

d_schema = ArrayType(StructType([
    StructField('a', StringType(), nullable=True),
    StructField('b', StringType(), nullable=True),
    StructField('c', StringType(), nullable=True),
    StructField('d', StringType(), nullable=True),
]))
df_schema = (StructType()
             .add("a", StringType(), nullable=True)
             .add("b", StringType(), nullable=True)
             .add("c", StringType(), nullable=True)
             .add("d", d_schema, nullable=True))

item1 = {
    "a": "a1",
    "b": "b1",
    "c": "c1",
    "d": [
        {
            "a": "a1",
            "b": "b1",
            "c": "c1",
            "d": "d1"
        },
        {
            "a": "a1",
            "b": "b1",
            "c": "c1",
        }
    ],
}

df = spark.createDataFrame([item1], schema=df_schema)

df.printSchema()
df.show(truncate=False)

df2 = df.withColumn("d1", f.explode(col("d")))
df2.printSchema()
df2.show(truncate=False)
df2.select("d1.c").show()

+---+---+---+--------------------------------------+------------------+
|a  |b  |c  |d                                     |d1                |
+---+---+---+--------------------------------------+------------------+
|a1 |b1 |c1 |[{a1, b1, c1, d1}, {a1, b1, c1, null}]|{a1, b1, c1, d1}  |
|a1 |b1 |c1 |[{a1, b1, c1, d1}, {a1, b1, c1, null}]|{a1, b1, c1, null}|
+---+---+---+--------------------------------------+------------------+

如果您不确定数组字段d 本身是否为空,那么建议使用explode_outer() 函数而不是explode()

根据评论匹配架构: 下面的代码将起作用:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession \
    .builder \
    .appName("StructuredStreamTesting") \
    .getOrCreate()
d_inter_schema = (StructType([
    StructField('a', StringType(), nullable=True),
    StructField('b', StringType(), nullable=True),
    StructField('c', StringType(), nullable=True),
    StructField('d', StringType(), nullable=True),
]))
d_schema = StructType().add("1", d_inter_schema, nullable=True).add("2", d_inter_schema, nullable=True)

df_schema = (StructType()
             .add("a", StringType(), nullable=True)
             .add("b", StringType(), nullable=True)
             .add("c", StringType(), nullable=True)
             .add("d", d_schema, nullable=True))

item1 = {
    "a": "a1",
    "b": "b1",
    "c": "c1",
    "d": {"1": {
        "a": "a1",
        "b": "b1",
        "c": "c1",
        "d": "d1"
    },
        "2": {
            "a": "a1",
            "b": "b1",
            "c": "c1",
        }
    },
}

df = spark.createDataFrame([item1], schema=df_schema)

df.printSchema()
df.show(truncate=False)

+---+---+---+--------------------------------------+
|a  |b  |c  |d                                     |
+---+---+---+--------------------------------------+
|a1 |b1 |c1 |{{a1, b1, c1, d1}, {a1, b1, c1, null}}|
+---+---+---+--------------------------------------+
df.select("d.1.c", "d.2.c").show()
+---+---+
|  c|  c|
+---+---+
| c1| c1|
+---+---+

【讨论】:

  • 感谢您的意见。实际上,在我的情况下,我以结构格式输入 d 列,如下所示。如果我像下面这样输入,那么它不起作用 code item1 = { "a": "a1", "b": "b1", "c": "c1", "d": { "1": { " a”:“a1”,“b”:“b1”,“c”:“c1”,“d”:“d1”},“2”:{“a”:“a1”,“b”:“ b1", "c": "c1", } } }
  • @Ram 我已经更新了答案以匹配您的架构,如果有帮助请告诉我
  • 感谢您的回答。我为一个示例 json 提供了结构。有时我的键名(1,2)是动态的,在某些文件中它将有 1 个元素,有时可能有 10 个元素。完整的架构也非常大。如何处理 struct d 中的动态元素
  • 其实 struct d 确实是数组类型的好候选。您是否可以控制让源将它们作为数组而不是结构发送?结构实际上应该是静态的而不是动态的。不过,您可以做一个解决方法,用您期望的最大元素定义您的 struct d,例如 10 或 20?并保持每一个都可以为空,所以如果你没有全部 20 个,那么它将正常工作,同时,如果你有 20 个也将覆盖。
猜你喜欢
  • 2018-07-23
  • 2020-12-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-02-23
  • 2021-09-23
  • 1970-01-01
相关资源
最近更新 更多