【问题标题】:How convert array of struct into struct in spark?如何将结构数组转换为spark中的结构?
【发布时间】:2022-08-04 12:48:02
【问题描述】:

数据集中有一个数组字段,例如:

my_array:
[
{id: 1, value: x},
{id: 2, value: y}
]

如何使它像:

my_strcut: {
  1: {value: x},
  2: {value: y}
}

我试过map_from_entriestransform,但仍然有结构数组作为输出。

更新

有一个从 json 读取数据的数据集。像这样的数据:

{\"id\":1, ... \"arrayOfStructs\" : [{\"name\": \"x\", \"key\":\"value\"}, {\"name\": \"y\", \"key\":\"value2\"}]}

输出应该是这样的:

{\"id\":1, ... \"structsOnly\" : { \"x\": {\"name\": \"x\", \"key\":\"value\"}}, { \"y\": {\"name\": \"y\", \"key\":\"value2\"}}}

  • 对 ID 号作为列名感到好奇。它们在所有行中都相同吗? Spark DF 需要定义良好的架构和稳定的列名。

标签: scala apache-spark apache-spark-sql


【解决方案1】:

我认为在这种情况下您想使用 MapType 而不是 StructType,因为 struct 要求您知道字段 id 的所有值。像这样使用 transform + aggregate 函数:

val df1 = df.withColumn(
    "structsOnly",
    expr("""aggregate(
              transform(arrayOfStructs, x -> map(x.name, x)), 
              cast(map() as map<string,struct<name:string,key:string>>), 
              (acc, x) -> map_concat(acc, x)
           )
    """)
  ).drop("arrayOfStructs")

df1.printSchema
//root
// |-- id: integer (nullable = false)
// |-- structsOnly: map (nullable = true)
// |    |-- key: string
// |    |-- value: struct (valueContainsNull = true)
// |    |    |-- name: string (nullable = true)
// |    |    |-- key: string (nullable = true)

df1.toJSON.show(false)
//+---------------------------------------------------------------------------------------+
//|value                                                                                  |
//+---------------------------------------------------------------------------------------+
//|{"id":1,"structsOnly":{"x":{"name":"x","key":"value"},"y":{"name":"y","key":"value2"}}}|
//+---------------------------------------------------------------------------------------+

现在,如果你真的想要结构类型列,那么你需要收集字段key 的所有可能值,然后像这样构造列:

val keys = df1.select(map_keys($"structsOnly")).as[Seq[String]].collect.flatten.distinct

val df2 = df1.withColumn(
  "structsOnly",
  struct(keys.map(k => col("structsOnly").getField(k).as(k)): _*)
)

【讨论】:

    【解决方案2】:

    乍一看,这似乎是一项简单的任务,但并非如此......

    使用它作为输入:

    case class Strct(id: Int, value: String)
    val df = Seq(Seq(Strct(1, "x"), Strct(2, "y"))).toDF("my_array")
    
    print(df.toJSON.head())
    // {"my_array":[{"id":1,"value":"x"},{"id":2,"value":"y"}]}
    
    df.printSchema()
    // root
    //  |-- my_array: array (nullable = true)
    //  |    |-- element: struct (containsNull = true)
    //  |    |    |-- id: integer (nullable = false)
    //  |    |    |-- value: string (nullable = true)
    

    我将首先创建一个映射并提取模式以便随后转换为结构。

    val json_col = to_json(aggregate(
        transform($"my_array", x => x.withField("value", x.dropFields("id"))),
        expr("map_filter(map('', struct('' as value)), (k, v) -> k != k)"),
        (acc, x) => map_concat(acc, map_from_entries(array(x)))
    ))
    val json_schema = spark.read.json(df.select(json_col).as[String]).schema
    val df2 = df.select(from_json(json_col, json_schema).alias("my_struct"))
    

    结果:

    print(df2.toJSON.head())
    // {"my_struct":{"1":{"value":"x"},"2":{"value":"y"}}}
    
    df2.printSchema()
    // root
    //  |-- my_struct: struct (nullable = true)
    //  |    |-- 1: struct (nullable = true)
    //  |    |    |-- value: string (nullable = true)
    //  |    |-- 2: struct (nullable = true)
    //  |    |    |-- value: string (nullable = true)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-08-09
      • 2019-04-04
      • 2023-04-08
      • 1970-01-01
      • 2015-06-22
      • 2010-09-16
      • 2021-09-05
      相关资源
      最近更新 更多