【问题标题】:How to infer schema of JSON files?如何推断 JSON 文件的模式?
【发布时间】:2018-11-18 12:24:46
【问题描述】:

我在 Java 中有以下字符串

{
    "header": {
        "gtfs_realtime_version": "1.0",
        "incrementality": 0,
        "timestamp": 1528460625,
        "user-data": "metra"
    },
    "entity": [{
            "id": "8424",
            "vehicle": {
                "trip": {
                    "trip_id": "UP-N_UN314_V1_D",
                    "route_id": "UP-N",
                    "start_time": "06:17:00",
                    "start_date": "20180608",
                    "schedule_relationship": 0
                },
                "vehicle": {
                    "id": "8424",
                    "label": "314"
                },
                "position": {
                    "latitude": 42.10085,
                    "longitude": -87.72896
                },
                "current_status": 2,
                "timestamp": 1528460601
            }
        }
    ]
}

表示 JSON 文档。我想在 Spark 数据框中为 流应用程序 推断架构。

如何像 CSV 文档一样拆分字符串的字段(我可以在其中调用.split(""))?

【问题讨论】:

  • 你有代表这个 JSON 的案例类吗?顺便说一句,在结构化流中进行推断是不可能的,只能批量进行
  • 我没有类,但字段的结构是标准的...我在 Spark Programming Guide 中看到可以使用这些命令推断架构:Dataset<Row> df = sparkSession .readStream() .format("kafka") .option("kafka.bootstrap.servers", KafkaFeeds.kafkaBrokerEndpoint) .option("subscribe", "kafkaToSparkTopic") .load();
  • 在“拆分”示例 json 字符串后,您究竟希望输出什么?
  • 我只想提取 JSON 文档的一些字段,例如位置字段的“纬度”值。所以我想要一个易于拆分的格式(如 CSV .split(""))

标签: java json apache-spark spark-streaming


【解决方案1】:

引用官方文档Schema inference and partition of streaming DataFrames/Datasets

默认情况下,来自基于文件的源的结构化流需要您指定架构,而不是依赖 Spark 自动推断它。此限制可确保将一致的模式用于流式查询,即使在失败的情况下也是如此。对于临时用例,您可以通过将 spark.sql.streaming.schemaInference 设置为 true 来重新启用架构推断。

然后您可以使用spark.sql.streaming.schemaInference 配置属性来启用架构推断。我不确定这是否适用于 JSON 文件。

我通常做的是加载单个文件(在批处理查询中和开始流式查询之前)以推断架构。这应该适用于你的情况。只需执行以下操作。

// I'm leaving converting Scala to Java as a home exercise
val jsonSchema = spark
  .read
  .option("multiLine", true) // <-- the trick
  .json("sample.json")
  .schema
scala> jsonSchema.printTreeString
root
 |-- entity: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- vehicle: struct (nullable = true)
 |    |    |    |-- current_status: long (nullable = true)
 |    |    |    |-- position: struct (nullable = true)
 |    |    |    |    |-- latitude: double (nullable = true)
 |    |    |    |    |-- longitude: double (nullable = true)
 |    |    |    |-- timestamp: long (nullable = true)
 |    |    |    |-- trip: struct (nullable = true)
 |    |    |    |    |-- route_id: string (nullable = true)
 |    |    |    |    |-- schedule_relationship: long (nullable = true)
 |    |    |    |    |-- start_date: string (nullable = true)
 |    |    |    |    |-- start_time: string (nullable = true)
 |    |    |    |    |-- trip_id: string (nullable = true)
 |    |    |    |-- vehicle: struct (nullable = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |-- label: string (nullable = true)
 |-- header: struct (nullable = true)
 |    |-- gtfs_realtime_version: string (nullable = true)
 |    |-- incrementality: long (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |    |-- user-data: string (nullable = true)

诀窍是使用multiLine 选项,这样整个文件就是您用来推断架构的单行。

【讨论】:

  • 确实如此。由于库依赖关系,它给了我一个错误,但它应该可以正常工作。我使用了 StructType 对象并手动定义了所有字段并且它可以工作。谢谢!这适用于静态文件,但我正在从流应用程序的 Kafka 主题中读取数据,它不会推断架构。你知道吗?
【解决方案2】:

使用

df = spark.read.json(r's3://mypath/', primitivesAsString='true')

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-06-16
    • 1970-01-01
    • 2021-10-28
    • 2016-11-03
    • 1970-01-01
    • 2021-05-28
    • 1970-01-01
    相关资源
    最近更新 更多