【问题标题】:Spark from_json with dynamic schemaSpark from_json 与动态模式
【发布时间】:2018-08-11 19:20:53
【问题描述】:

我正在尝试使用 Spark 处理具有可变结构(嵌套 JSON)的 JSON 数据。输入 JSON 数据可能非常大,每行有超过 1000 个键,并且一批可能超过 20 GB。 整个批次由 30 个数据源生成,每个 JSON 的“key2”可用于识别源,每个源的结构是预定义的。

处理此类数据的最佳方法是什么? 我已经尝试使用 from_json 如下所示,但它仅适用于固定模式,并且首先使用它我需要根据每个源对数据进行分组,然后应用模式。 由于数据量很大,我的首选是只扫描一次数据,然后根据预定义的模式从每个源中提取所需的值。

import org.apache.spark.sql.types._ 
import spark.implicits._

val data = sc.parallelize(
    """{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1"}}"""
    :: Nil)
val df = data.toDF


val schema = (new StructType)
    .add("key1", StringType)
    .add("key2", StringType)
    .add("key3", (new StructType)
    .add("key3_k1", StringType))


df.select(from_json($"value",schema).as("json_str"))
  .select($"json_str.key3.key3_k1").collect
res17: Array[org.apache.spark.sql.Row] = Array([xxx])

【问题讨论】:

  • 这是一个想法,而不是一个完整的答案。你的意思是你所有的模式都共享一些公共键,而那些是你关心的? Argonaut's Json 对象是可序列化的,因此您可以先创建一个RDD[Json],然后选择相关字段。 (并非所有 Argonaut 对象都是可序列化的,因此您必须小心。或者选择不同的 JSON 库。)
  • 通用键很少(例如key1,key2),但我关心可变键。公共键包含关于 json 的信息,如时间戳、来源等,可变部分包含实际信息。
  • 你想要的输出是什么?一个数据框,其列是所有相关键的并集(否则为 null)?每个模式一个数据框?我仍然认为您需要使用一些 JSON 库。您必须处理 30 种不同的案例,但我看不出在任何情况下如何避免这种情况——您有 30 种不同类型的数据。
  • 是的,我正在对 30 个 DF 进行“联合”。对于每个 DF,我在字符串列中提取所需的键值。但是我在 UDF 中尝试了 'json-lenses' 库并且它正在工作,但考虑到未来的支持和 spark 的内置优化,我想尝试 spark 的 JSON 函数

标签: json apache-spark apache-spark-sql


【解决方案1】:

这只是对@Ramesh Maharjan 答案的重述,但使用了更现代的 Spark 语法。

我发现这种方法隐藏在 DataFrameReader 中,它允许您将 JSON 字符串从 Dataset[String] 解析为任意 DataFrame 并利用 Spark 在直接从JSON 文件。每行的架构可以完全不同。

def json(jsonDataset: Dataset[String]): DataFrame

示例用法:

val jsonStringDs = spark.createDataset[String](
  Seq(
      ("""{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}"""),
      ("""{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}""")))

jsonStringDs.show

jsonStringDs:org.apache.spark.sql.Dataset[String] = [value: string]
+----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                 
|
+----------------------------------------------------------------------------------------------------------------------+
|{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}|
|{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}  |
+----------------------------------------------------------------------------------------------------------------------+


val df = spark.read.json(jsonStringDs)
df.show(false)

df:org.apache.spark.sql.DataFrame = [CEO: string, address: struct ... 6 more fields]
+----------+------------------+-------------+---------+--------+------------+------+------------+
|CEO       |address           |employeeCount|firstname|lastname|marketCap   |name  |revenue     |
+----------+------------------+-------------+---------+--------+------------+------+------------+
|null      |[London,Baker,121]|null         |Sherlock |Holmes  |null        |null  |null        |
|Jeff Bezos|null              |500000       |null     |null    |817117000000|Amazon|177900000000|
+----------+------------------+-------------+---------+--------+------------+------+------------+

该方法在 Spark 2.2.0 中可用: http://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@json(jsonDataset:org.apache.spark.sql.Dataset[String]):org.apache.spark.sql.DataFrame

【讨论】:

    【解决方案2】:

    如果你有问题中提到的数据

    val data = sc.parallelize(
        """{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1"}}"""
        :: Nil)
    

    您不需要为 json 数据创建schemaSpark sql 可以从 json 字符串 中推断出schema。您只需使用SQLContext.read.json,如下所示

    val df = sqlContext.read.json(data)
    

    这将为您提供schema,如下所示上面使用的rdd数据

    root
     |-- key1: string (nullable = true)
     |-- key2: string (nullable = true)
     |-- key3: struct (nullable = true)
     |    |-- key3_k1: string (nullable = true)
    

    你也可以select key3_k1

    df2.select("key3.key3_k1").show(false)
    //+-------+
    //|key3_k1|
    //+-------+
    //|key3_v1|
    //+-------+
    

    您可以随意操作dataframe。希望回答对你有帮助

    【讨论】:

    • 我面临的这种方法有两个问题,1. spark 需要很长时间才能确定架构,因为我的输入数据很大。 2. 两个源可以在同一级别(动态部分)具有相同的键,但值可以是嵌套的 json,在这种情况下,spark 将推断模式直到公共键
    • @Syntax,那么您必须弄清楚输入文件中相同的键的更改,对不起,我无能为力了
    • 感谢 Ramesh 的建议。很遗憾,我无法更改源数据。
    【解决方案3】:

    我不确定我的建议是否可以帮助你,虽然我有类似的情况,我解决了如下:

    1) 所以这个想法是使用 json rapture(或其他一些 json 库)来 动态加载 JSON 模式。例如,您可以阅读第一个 json文件的行以发现架构(类似于我所做的 这里有 jsonSchema)

    2) 动态生成模式。首先遍历动态 字段(请注意,我将 key3 的值投影为 Map[String, String]) 并为它们中的每一个添加一个 StructField 到架构

    3) 将生成的架构应用到您的数据帧中

    import rapture.json._
    import jsonBackends.jackson._
    
    val jsonSchema = """{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1", "key3_k2":"key3_v2", "key3_k3":"key3_v3"}}"""
    val json = Json.parse(jsonSchema)
    
    import scala.collection.mutable.ArrayBuffer
    import org.apache.spark.sql.types.StructField
    import org.apache.spark.sql.types.{StringType, StructType}
    
    val schema = ArrayBuffer[StructField]()
    //we could do this dynamic as well with json rapture
    schema.appendAll(List(StructField("key1", StringType), StructField("key2", StringType)))
    
    val items = ArrayBuffer[StructField]()
    json.key3.as[Map[String, String]].foreach{
      case(k, v) => {
        items.append(StructField(k, StringType))
      }
    }
    val complexColumn =  new StructType(items.toArray)
    schema.append(StructField("key3", complexColumn))
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    val sparkConf = new SparkConf().setAppName("dynamic-json-schema").setMaster("local")
    
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    
    val jsonDF = spark.read.schema(StructType(schema.toList)).json("""your_path\data.json""")
    
    jsonDF.select("key1", "key2", "key3.key3_k1", "key3.key3_k2", "key3.key3_k3").show()
    

    我使用下一个数据作为输入:

    {"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v11", "key3_k2":"key3_v21", "key3_k3":"key3_v31"}}
    {"key1":"val2","key2":"source2","key3":{"key3_k1":"key3_v12", "key3_k2":"key3_v22", "key3_k3":"key3_v32"}}
    {"key1":"val3","key2":"source3","key3":{"key3_k1":"key3_v13", "key3_k2":"key3_v23", "key3_k3":"key3_v33"}}
    

    还有输出:

    +----+-------+--------+--------+--------+
    |key1|   key2| key3_k1| key3_k2| key3_k3|
    +----+-------+--------+--------+--------+
    |val1|source1|key3_v11|key3_v21|key3_v31|
    |val2|source2|key3_v12|key3_v22|key3_v32|
    |val2|source3|key3_v13|key3_v23|key3_v33|
    +----+-------+--------+--------+--------+
    

    我还没有测试过的一个高级替代方案是从 JSON 模式生成一个名为 JsonRow 的案例类,以便拥有一个强类型数据集,它提供更好的序列化性能,除了让你的代码更多可维护。要完成这项工作,您首先需要创建一个 JsonRow.scala 文件,然后您应该实现一个 sbt 预构建脚本,该脚本将根据您的源文件动态修改 JsonRow.scala 的内容(当然您可能有多个)。要动态生成 JsonRow 类,您可以使用以下代码:

    def generateClass(members: Map[String, String], name: String) : Any = {
        val classMembers = for (m <- members) yield {
            s"${m._1}: String"
        }
    
        val classDef = s"""case class ${name}(${classMembers.mkString(",")});scala.reflect.classTag[${name}].runtimeClass"""
        classDef
      }
    

    generateClass 方法接受一个字符串映射来创建类成员和类名本身。生成的类的成员你可以再次从你的 json 模式中填充它们:

    import org.codehaus.jackson.node.{ObjectNode, TextNode}
    import collection.JavaConversions._
    
    val mapping = collection.mutable.Map[String, String]()
    val fields = json.$root.value.asInstanceOf[ObjectNode].getFields
    
    for (f <- fields) {
      (f.getKey, f.getValue) match {
        case (k: String, v: TextNode) => mapping(k) = v.asText
        case (k: String, v: ObjectNode) => v.getFields.foreach(f => mapping(f.getKey) = f.getValue.asText)
        case _ => None
      }
    }
    
    val dynClass = generateClass(mapping.toMap, "JsonRow")
    println(dynClass)
    

    打印出来:

    case class JsonRow(key3_k2: String,key3_k1: String,key1: String,key2: String,key3_k3: String);scala.reflect.classTag[JsonRow].runtimeClass
    

    祝你好运

    【讨论】:

    • 如果可能的话,请您也为此提供python实现。
    猜你喜欢
    • 2017-12-13
    • 1970-01-01
    • 1970-01-01
    • 2019-01-27
    • 1970-01-01
    • 2017-03-29
    • 1970-01-01
    • 2017-07-19
    • 2018-02-08
    相关资源
    最近更新 更多