【问题标题】:Spark Scala Dataframe Sequence of JSON from a CSV fileCSV 文件中 JSON 的 Spark Scala 数据帧序列
【发布时间】:2020-05-14 08:16:09
【问题描述】:

我在一个包含 JSON 序列的 CSV 文件中有一个字段。有关模拟数据,请参见下文。 from_json 仅识别 1 个项目的序列。有人可以提供有关将字符串字段转换为序列的 Spark/Scala 惯用方式的指导吗?我相信一旦我有了一个序列,我就可以炸开这个字段并使用 from_json 函数。

|MainId|genres                                                                                                                       |
+------+-----------------------------------------------------------------------------------------------------------------------------+
|862   |[{'id': 16, 'name': 'Animation'}, {'id': 35, 'name': 'Comedy'}, {'id': 10751, 'name': 'Family'}]                             |
|8844  |[{'id': 12, 'name': 'Adventure'}, {'id': 14, 'name': 'Fantasy'}, {'id': 10751, 'name': 'Family'}]                            |
|15602 |[{'id': 10749, 'name': 'Romance'}, {'id': 35, 'name': 'Comedy'}]                                                             |
|31357 |[{'id': 35, 'name': 'Comedy'}, {'id': 18, 'name': 'Drama'}, {'id': 10749, 'name': 'Romance'}]                                |
|11862 |[{'id': 35, 'name': 'Comedy'}]            

【问题讨论】:

  • 可以使用spark将本列读取为json: val jsonDF = spark.read.json(df("genres")) 如果需要MainId,可以先使用monotonically_increasing_id再加入你的数据框
  • 不幸的是,构造不起作用。如果有帮助,我正在使用 Spark 2.4。
  • 能否分享df的架构?
  • 这是输入数据框的架构
  • ```成人:布尔值belongs_to_collection:字符串预算:整数类型:字符串主页:字符串MainId:字符串imdb_id:字符串original_language:字符串original_title:字符串概述:字符串流行度:双poster_path:字符串production_companies:字符串生产国家:字符串发布日期:日期收入:整数运行时间:双语语言:字符串状态:字符串标语:字符串标题:字符串视频:布尔投票平均:双投票计数:整数```

标签: json scala csv apache-spark sequence


【解决方案1】:
scala> df.show(false)
+------+--------------------------------------------------------------------------------------+
|MainId|genres                                                                                |
+------+--------------------------------------------------------------------------------------+
|862   |[{'id':16,'name':'Animation'},{'id':35,'name':'Comedy'},{'id':10751,'name':'Family'}] |
|8844  |[{'id':12,'name':'Adventure'},{'id':14,'name':'Fantasy'},{'id':10751,'name':'Family'}]|
|15602 |[{'id':10749,'name':'Romance'},{'id':35,'name':'Comedy'}]                             |
|31357 |[{'id':35,'name':'Comedy'},{'id':18,'name':'Drama'},{'id':10749,'name':'Romance'}]    |
|11862 |[{'id':35,'name':'Comedy'}]                                                           |
+------+--------------------------------------------------------------------------------------+


scala> val df1 = df.withColumn("genres", regexp_replace(col("genres"), "[\\[\\]{}']", ""))
                   .withColumn("genres", explode(split(col("genres"), ",")))
                   .withColumn("columns", split(col("genres"), ":")(0))
                   .withColumn("value", split(col("genres"), ":")(1))

scala> df1.show(false)
+------+--------------+-------+---------+
|MainId|genres        |columns|value    |
+------+--------------+-------+---------+
|862   |id:16         |id     |16       |
|862   |name:Animation|name   |Animation|
|862   |id:35         |id     |35       |
|862   |name:Comedy   |name   |Comedy   |
|862   |id:10751      |id     |10751    |
|862   |name:Family   |name   |Family   |
|8844  |id:12         |id     |12       |
|8844  |name:Adventure|name   |Adventure|
|8844  |id:14         |id     |14       |
|8844  |name:Fantasy  |name   |Fantasy  |
|8844  |id:10751      |id     |10751    |
|8844  |name:Family   |name   |Family   |
|15602 |id:10749      |id     |10749    |
|15602 |name:Romance  |name   |Romance  |
|15602 |id:35         |id     |35       |
|15602 |name:Comedy   |name   |Comedy   |
|31357 |id:35         |id     |35       |
|31357 |name:Comedy   |name   |Comedy   |
|31357 |id:18         |id     |18       |
|31357 |name:Drama    |name   |Drama    |
+------+--------------+-------+---------+

scala>  val df2 = df1.groupBy("MainId").pivot("columns")
                     .agg(collect_list(col("value")))
                     .withColumn("json", explode(arrays_zip(col("id"), col("name"))))
                     .select("MainId", "json")

scala> df2.show()
+------+----------------+
|MainId|            json|
+------+----------------+
|   862| [16, Animation]|
|   862|    [35, Comedy]|
|   862| [10751, Family]|
| 15602|[10749, Romance]|
| 15602|    [35, Comedy]|
| 11862|    [35, Comedy]|
| 31357|    [35, Comedy]|
| 31357|     [18, Drama]|
| 31357|[10749, Romance]|
|  8844| [12, Adventure]|
|  8844|   [14, Fantasy]|
|  8844| [10751, Family]|
+------+----------------+


scala> df2.select("MainId","json.id","json.name").show
+------+-----+---------+
|MainId|   id|     name|
+------+-----+---------+
|   862|   16|Animation|
|   862|   35|   Comedy|
|   862|10751|   Family|
| 15602|10749|  Romance|
| 15602|   35|   Comedy|
| 11862|   35|   Comedy|
| 31357|   35|   Comedy|
| 31357|   18|    Drama|
| 31357|10749|  Romance|
|  8844|   12|Adventure|
|  8844|   14|  Fantasy|
|  8844|10751|   Family|
+------+-----+---------+

【讨论】:

    猜你喜欢
    • 2019-01-12
    • 2021-06-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多