【问题标题】:Need to parse the json file需要解析json文件
【发布时间】:2020-05-09 17:50:57
【问题描述】:
root
 |-- eid: string (nullable = true)
 |-- keys: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

需要使用 spark 数据帧将具有上述模式的 jsonfile 解析为结构化格式。键列的列名在“值”列中具有值。

样本数据文件: {'type': 'logs', 'eid': '1', 'keys': ['crt_ts', 'id', 'upd_ts', 'km', 'pivl', 'distance', 'speed'] , '值': [['12343.0000.012', 'AAGA1567', '1333.333.333', '565656', '10.5', '121', '64']]}

预期输出:

eid crt_ts id  upd_ts km  pivl distance speed type
  1  12343.0000.012 AAGA1567 1333.333.333 565656 10.5 121 64 logs

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    请检查下面的代码,我用过groupBy, pivot & agg:

    scala> val js = Seq(""" {'type': 'logs', 'eid': '1', 'keys': ['crt_ts', 'id', 'upd_ts', 'km', 'pivl', 'distance', 'speed'], 'values': [['12343.0000.012', 'AAGA1567', '1333.333.333', '565656', '10.5', '121', '64']]}""").toDS
    js: org.apache.spark.sql.Dataset[String] = [value: string]
    
    scala> val jdf = spark.read.json(js)
    jdf: org.apache.spark.sql.DataFrame = [eid: string, keys: array<string> ... 2 more fields]
    
    scala> jdf.printSchema
    root
     |-- eid: string (nullable = true)
     |-- keys: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- type: string (nullable = true)
     |-- values: array (nullable = true)
     |    |-- element: array (containsNull = true)
     |    |    |-- element: string (containsNull = true)
    
    
    scala> jdf.show(false)
    +---+-----------------------------------------------+----+-----------------------------------------------------------------+
    |eid|keys                                           |type|values                                                           |
    +---+-----------------------------------------------+----+-----------------------------------------------------------------+
    |1  |[crt_ts, id, upd_ts, km, pivl, distance, speed]|logs|[[12343.0000.012, AAGA1567, 1333.333.333, 565656, 10.5, 121, 64]]|
    +---+-----------------------------------------------+----+-----------------------------------------------------------------+
    
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    jdf.select($"eid",$"keys",explode($"values").as("values"),$"type")
    .select($"eid",$"type",explode(arrays_zip($"keys",$"values")).as("azip"))
    .select($"eid",$"azip.*",$"type")
    .groupBy($"type",$"eid")
    .pivot($"keys")
    .agg(first("values"))
    .show(false)
    
    // Exiting paste mode, now interpreting.
    
    +----+---+--------------+--------+--------+------+----+-----+------------+
    |type|eid|crt_ts        |distance|id      |km    |pivl|speed|upd_ts      |
    +----+---+--------------+--------+--------+------+----+-----+------------+
    |logs|1  |12343.0000.012|121     |AAGA1567|565656|10.5|64   |1333.333.333|
    +----+---+--------------+--------+--------+------+----+-----+------------+
    
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-09-10
      • 2015-09-04
      • 1970-01-01
      • 2021-06-30
      • 1970-01-01
      • 2016-07-14
      • 1970-01-01
      • 2021-04-17
      相关资源
      最近更新 更多