您可以在其中一个元素缺少字段的情况下展开结构数组,如下所示:
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, ArrayType, StructField, StringType
spark = SparkSession \
.builder \
.appName("SparkTesting") \
.getOrCreate()
d_schema = ArrayType(StructType([
StructField('a', StringType(), nullable=True),
StructField('b', StringType(), nullable=True),
StructField('c', StringType(), nullable=True),
StructField('d', StringType(), nullable=True),
]))
df_schema = (StructType()
.add("a", StringType(), nullable=True)
.add("b", StringType(), nullable=True)
.add("c", StringType(), nullable=True)
.add("d", d_schema, nullable=True))
item1 = {
"a": "a1",
"b": "b1",
"c": "c1",
"d": [
{
"a": "a1",
"b": "b1",
"c": "c1",
"d": "d1"
},
{
"a": "a1",
"b": "b1",
"c": "c1",
}
],
}
df = spark.createDataFrame([item1], schema=df_schema)
df.printSchema()
df.show(truncate=False)
df2 = df.withColumn("d1", f.explode(col("d")))
df2.printSchema()
df2.show(truncate=False)
df2.select("d1.c").show()
+---+---+---+--------------------------------------+------------------+
|a |b |c |d |d1 |
+---+---+---+--------------------------------------+------------------+
|a1 |b1 |c1 |[{a1, b1, c1, d1}, {a1, b1, c1, null}]|{a1, b1, c1, d1} |
|a1 |b1 |c1 |[{a1, b1, c1, d1}, {a1, b1, c1, null}]|{a1, b1, c1, null}|
+---+---+---+--------------------------------------+------------------+
如果您不确定数组字段d 本身是否为空,那么建议使用explode_outer() 函数而不是explode()。
根据评论匹配架构:
下面的代码将起作用:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession \
.builder \
.appName("StructuredStreamTesting") \
.getOrCreate()
d_inter_schema = (StructType([
StructField('a', StringType(), nullable=True),
StructField('b', StringType(), nullable=True),
StructField('c', StringType(), nullable=True),
StructField('d', StringType(), nullable=True),
]))
d_schema = StructType().add("1", d_inter_schema, nullable=True).add("2", d_inter_schema, nullable=True)
df_schema = (StructType()
.add("a", StringType(), nullable=True)
.add("b", StringType(), nullable=True)
.add("c", StringType(), nullable=True)
.add("d", d_schema, nullable=True))
item1 = {
"a": "a1",
"b": "b1",
"c": "c1",
"d": {"1": {
"a": "a1",
"b": "b1",
"c": "c1",
"d": "d1"
},
"2": {
"a": "a1",
"b": "b1",
"c": "c1",
}
},
}
df = spark.createDataFrame([item1], schema=df_schema)
df.printSchema()
df.show(truncate=False)
+---+---+---+--------------------------------------+
|a |b |c |d |
+---+---+---+--------------------------------------+
|a1 |b1 |c1 |{{a1, b1, c1, d1}, {a1, b1, c1, null}}|
+---+---+---+--------------------------------------+
df.select("d.1.c", "d.2.c").show()
+---+---+
| c| c|
+---+---+
| c1| c1|
+---+---+