【问题标题】:Read JSON in pySpark with custom schema in GCP Dataproc使用 GCP Dataproc 中的自定义架构在 pySpark 中读取 JSON
【发布时间】:2021-06-19 04:52:00
【问题描述】:

在 GCP Dataproc(使用 pySpark)中,我正在执行一项任务,即根据自定义架构读取 JSON 文件并将其加载到 Dataframe 中。

我确实有以下示例测试 JSON:

{"Transactions": [{"schema": "a",
"id": "1",
"app": "testing",
"description": "JSON schema for testing purpose"}]}

我创建了以下架构:

custom_schema = StructType([
                      StructField("Transactions",
                         StructType([
                             StructField("schema", StringType()),
                             StructField("id", StringType()),
                             StructField("app", StringType()),
                             StructField("description", StringType())
                                   ])
                            )])

将 JSON 读取为: df_2 = spark.read.json(json_path, schema = custom_schema)

得到以下结果,

现在,我需要检查 Dataframe 中的数据,当我尝试执行 df_2.show() 时,会花费太多时间并显示为 kernel Busy 几个小时。

我需要帮助,我在代码中缺少什么以及如何查看数据框中的数据(表格格式)。

【问题讨论】:

  • 尝试了以下,但没有奏效。 ``` custom_schema = StructType([ StructField("Transactions", ArrayType(StructType([ StructField("schema", StringType())), StructField("id", StringType()), StructField("app", StringType() ), StructField("description", StringType()) ])) )]) ```
  • 你可以试试 df_2.show(5) 看看有没有结果?
  • 是的,我试过了……没用……Dataproc 集群配置似乎也有问题?
  • 您是否尝试将其作为 PySpark 作业提交,而不是在 Jupyter 笔记本中运行。
  • 我无法重现您的问题,df.show() 为我工作。您可以尝试使用更简单的架构。

标签: json google-cloud-platform apache-spark-sql pyspark-dataframes google-cloud-dataproc


【解决方案1】:

我认为问题在于您的自定义架构定义和 JSON 文件。以下代码和 JSON 文件对我有用:

代码

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType

spark = SparkSession \
    .builder \
    .appName("JSON test") \
    .getOrCreate()

custom_schema = StructType([
    StructField("schema", StringType(), False),
    StructField("id", StringType(), True),
    StructField("app", StringType(), True),
    StructField("description", StringType(), True)])

df = spark.read.format("json") \
    .schema(custom_schema) \
    .load("gs://my-bucket/transactions.json")

df.show()

JSON 文件

gs://my-bucket/transactions.json的内容是:

{"schema": "a", "id": "1", "app": "foo", "description": "test"}
{"schema": "b", "id": "2", "app": "bar", "description": "test2"}

输出

+------+---+---+-----------+
|schema| id|app|description|
+------+---+---+-----------+
|     a|  1|foo|       test|
|     b|  2|bar|      test2|
+------+---+---+-----------+

【讨论】:

    猜你喜欢
    • 2023-01-20
    • 2019-11-22
    • 2019-05-10
    • 1970-01-01
    • 1970-01-01
    • 2022-01-03
    • 1970-01-01
    • 2021-07-15
    • 1970-01-01
    相关资源
    最近更新 更多