【问题标题】:JSON to AVRO to JSONJSON 到 AVRO 到 JSON
【发布时间】:2020-10-22 10:11:44
【问题描述】:

我正在尝试将 json 文件转换为 avro 和反向。

我的输入文件是

[
  {
    "userId": 1,
    "firstName": "Krish",
    "lastName": "Lee",
    "phoneNumber": "123456",
    "emailAddress": "krish.lee@abc.com"
  },
  {
    "userId": 2,
    "firstName": "racks",
    "lastName": "jacson",
    "phoneNumber": "123456",
    "emailAddress": "racks.jacson@abc.com"
  }
]

我的输出文件是

{"emailAddress":"krish.lee@abc.com","firstName":"Krish","lastName":"Lee","phoneNumber":"123456","userId":1}
{"emailAddress":"racks.jacson@abc.com","firstName":"racks","lastName":"jacson","phoneNumber":"123456","userId":2}

下面是我的源码

JSON 到 Avro

val df = spark.read.option("multiLine", true).json("src\\main\\resources\\user.json")
df.printSchema()
df.show()

//convert to avro
df.write.mode("append").format("com.databricks.spark.avro").save("src\\main\\resources\\user1")

AVRO 转 JSON

val jsonDF = spark.read
  .format("com.databricks.spark.avro").load("src\\main\\resources\\user")

jsonDF.show()
jsonDF.printSchema()
jsonDF.write.mode(SaveMode.Overwrite).json("src\\main\\resources\\output\\json")

请帮忙

【问题讨论】:

  • 你有什么问题?
  • 如果您查看输入文件,它是一个包含多个对象的列表。在输出文件中,我只得到一个对象,它不是一个列表。
  • 在写的时候需要把数据转换成数组再写。

标签: scala apache-spark avro


【解决方案1】:

检查下面的代码。

输入数据

scala> import sys.process._

scala> "cat /root/spark-examples/data.json".!
[
  {
    "userId": 1,
    "firstName": "Krish",
    "lastName": "Lee",
    "phoneNumber": "123456",
    "emailAddress": "krish.lee@abc.com"
  },
  {
    "userId": 2,
    "firstName": "racks",
    "lastName": "jacson",
    "phoneNumber": "123456",
    "emailAddress": "racks.jacson@abc.com"
  }
]

将json文件内容加载到DataFrame

scala> val df = spark
                  .read
                  .option("multiline","true")
                  .json("/root/spark-examples/data.json")

df: org.apache.spark.sql.DataFrame = [emailAddress: string, firstName: string ... 3 more fields]

一旦 json 文件被加载到 DataFrame 中,它将被转换为 array of objectmultiple objects or rows,如下所示。

scala> df.show(false)
+--------------------+---------+--------+-----------+------+
|emailAddress        |firstName|lastName|phoneNumber|userId|
+--------------------+---------+--------+-----------+------+
|krish.lee@abc.com   |Krish    |Lee     |123456     |1     |
|racks.jacson@abc.com|racks    |jacson  |123456     |2     |
+--------------------+---------+--------+-----------+------+

当你回写DataFrame时,它会写成多行。

scala> df.repartition(1).write.mode("overwrite").json("/tmp/dataa/")
scala> "ls -ltr /tmp/dataa/".!
total 4
-rw-r--r-- 1 root root 222 Oct 22 12:19 part-00000-fa9e79f6-2689-4385-b3ee-fd19cf291a31-c000.json
-rw-r--r-- 1 root root   0 Oct 22 12:19 _SUCCESS
scala> "cat /tmp/dataa/part-00000-fa9e79f6-2689-4385-b3ee-fd19cf291a31-c000.json".!
{"emailAddress":"krish.lee@abc.com","firstName":"Krish","lastName":"Lee","phoneNumber":"123456","userId":1}
{"emailAddress":"racks.jacson@abc.com","firstName":"racks","lastName":"jacson","phoneNumber":"123456","userId":2}

如果您想要与输入数据相同,请按照以下代码操作。

scala> df
.select(to_json(collect_list(struct($"*"))).as("data"))
.write
.format("text") // You need to use text format, Using json will give you wrong data.
.mode("overwrite")
.save("/tmp/datab/")
scala> "ls -ltr /tmp/datab/".!
total 4
-rw-r--r-- 1 root root 224 Oct 22 12:19 part-00000-0896730e-51e1-4728-bd6b-cdfabc03978e-c000.txt
-rw-r--r-- 1 root root   0 Oct 22 12:19 _SUCCESS
scala> "cat /tmp/datab/part-00000-0896730e-51e1-4728-bd6b-cdfabc03978e-c000.txt".!
[
    {"emailAddress":"krish.lee@abc.com","firstName":"Krish","lastName":"Lee","phoneNumber":"123456","userId":1},
    {"emailAddress":"racks.jacson@abc.com","firstName":"racks","lastName":"jacson","phoneNumber":"123456","userId":2}
]

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-02
    • 1970-01-01
    • 2021-09-08
    • 2020-09-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多