【问题标题】:Databricks - Pyspark - Handling nested json with a dynamic keyDatabricks - Pyspark - 使用动态键处理嵌套 json
【发布时间】:2021-07-21 16:37:17
【问题描述】:

我有一个示例 json 数据文件,其结构如下:

{
    "Header": {
        "Code1": "abc",
        "Code2": "def",
        "Code3": "ghi",
        "Code4": "jkl",
    },
    "TimeSeries": {
        "2020-11-25T03:00:00+00:00": {
            "UnitPrice": 1000,
            "Amount": 10000,

        },
        "2020-11-26T03:00:00+00:00": {
            "UnitPrice": 1000,
            "Amount": 10000,

        }
    }
}

当我使用命令将其解析为数据块时:

df = spark.read.json("/FileStore/test.txt") 

我得到 2 个输出对象:Header 和 TimeSeries。使用 TimeSeries,我希望能够展平结构,使其具有以下架构:

Date
UnitPrice
Amount 

由于日期字段是一个键,我目前只能通过遍历列名然后在点符号中动态地使用它来访问它:

def flatten_json(data):


  columnlist = data.select("TimeSeries.*")
  count = 0 
  for name in data.select("TimeSeries.*"):
    df1 = data.select("Header.*").withColumn(("Timeseries"), lit(columnlist.columns[count])).withColumn("join", lit("a"))
    df2 = data.select("TimeSeries." + columnlist.columns[count] + ".*").withColumn("join", lit("a"))
    if count == 0: 
      df3 = df1.join(df2, on=['join'], how="inner")
    else: 
      df3 = df3.union(df1.join(df2, on=['join'], how="inner"))
    count = count + 1
  return(df3)

这远非理想。有谁知道创建所描述数据框的更好方法?

【问题讨论】:

    标签: python json pyspark databricks


    【解决方案1】:

    想法:

    • 第 1 步:分别提取 Header 和 TimeSeries。

    • 第 2 步:对于 TimeSeries 对象中的每个字段,提取 AmountUnitPrice,连同字段的 name,将它们填充到一个结构中。

    • 第 3 步:将所有这些结构合并到一个数组列中,然后分解它。

    • 第 4 步:从展开的列中提取 TimeseriesAmountUnitPrice

    • 第 5 步:与标题行交叉连接。

    import pyspark.sql.functions as F
    
    header_df = df.select("Header.*")
    timeseries_df = df.select("TimeSeries.*")
    fieldNames = enumerate(timeseries_df.schema.fieldNames())
    cols = [F.struct(F.lit(name).alias("Timeseries"), col(name).getItem("Amount").alias("Amount"), col(name).getItem("UnitPrice").alias("UnitPrice")).alias("ts_" + str(idx)) for idx, name in fieldNames]
    combined = explode(array(cols)).alias("comb")
    timeseries = timeseries_df.select(combined).select('comb.Timeseries', 'comb.Amount', 'comb.UnitPrice')
    result = header_df.crossJoin(timeseries)
    result.show(truncate = False)
    

    输出:

    +-----+-----+-----+-----+-------------------------+------+---------+
    |Code1|Code2|Code3|Code4|Timeseries               |Amount|UnitPrice|
    +-----+-----+-----+-----+-------------------------+------+---------+
    |abc  |def  |ghi  |jkl  |2020-11-25T03:00:00+00:00|10000 |1000     |
    |abc  |def  |ghi  |jkl  |2020-11-26T03:00:00+00:00|10000 |1000     |
    +-----+-----+-----+-----+-------------------------+------+---------+
    

    【讨论】:

      猜你喜欢
      • 2022-10-12
      • 2023-01-16
      • 1970-01-01
      • 2023-02-06
      • 1970-01-01
      • 1970-01-01
      • 2012-09-14
      • 2023-04-01
      • 1970-01-01
      相关资源
      最近更新 更多