【问题标题】:Spark - Specifying Schema for nested JsonSpark - 为嵌套的 Json 指定模式
【发布时间】:2017-07-26 09:21:08
【问题描述】:

使用火花 1.2.0

嗨,

我想将数据从 kafka 流保存到镶木地板。 使用 jsonRDD 创建表时将模式应用于 JSON 数据集。 如此处所述https://databricks.com/blog/2015/02/02/an-introduction-to-json-support-in-spark-sql.html

数据来自 Kafka,并以 嵌套 json 的形式传入。

这是一个从文本文件中读取的基本示例,了解我如何为非嵌套 json 指定架构。

    //contents of json
    hdfs@2db12:~$ hadoop fs -cat User/names.json
    {"name":"Michael", "age":10}
    {"name":"Andy", "age":30}
    {"name":"Justin"}

    //create RDD from json
    scala> val names= sc.textFile("hdfs://10.0.11.8:8020/user/hdfs/User/names.json")
    scala> names.collect().foreach(println)
    {"name":"Michael", "age":10}
    {"name":"Andy", "age":30}
    {"name":"Justin"}

    // specify schema
    val schemaString = "name age gender"
    val schema =
    StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

    val peopleSchemaRDD = sqlContext.jsonRDD(names, schema)

   scala> peopleSchemaRDD.printSchema()
   root
   |-- name: string (nullable = true)
   |-- age: string (nullable = true)
   |-- gender: string (nullable = true)

   scala> peopleSchemaRDD.registerTempTable("people")

   scala> sqlContext.sql("SELECT name,age,gender FROM   people").collect().foreach(println)
   [Michael,10,null]
   [Andy,30,null]
   [Justin,null,null]

是否可以为嵌套的 json 指定模式? 例如 .a 像这样的 json {"filename":"details","attributes":{"name":"Michael", "age":10}}

非常感谢

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    如果您至少有一个带有性别字段的 json,则可以使用 sqlContext.jsonFile()

    或详细定义架构

    val schema = StructType( 
      StructField("filename", StringType, true) ::
      StructField(
        "attributes",
        StructType(schemaString.split(" ").map(fieldName => 
          StructField(fieldName, StringType, true)
        ))
      ) :: Nil
    )
    

    【讨论】:

      【解决方案2】:

      一个java版本..下面的链接帮助了我

      create nested dataframe programmatically with Spark

      public static void main(String[] args) throws AnalysisException {
          String master = "local[*]";
      
          List<StructField> employeeFields = new ArrayList<>();
          employeeFields.add(DataTypes.createStructField("firstName", DataTypes.StringType, true));
          employeeFields.add(DataTypes.createStructField("lastName", DataTypes.StringType, true));
          employeeFields.add(DataTypes.createStructField("email", DataTypes.StringType, true));
      
          List<StructField> addressFields = new ArrayList<>();
          addressFields.add(DataTypes.createStructField("city", DataTypes.StringType, true));
          addressFields.add(DataTypes.createStructField("state", DataTypes.StringType, true));
          addressFields.add(DataTypes.createStructField("zip", DataTypes.StringType, true));
          ArrayType addressStruct = DataTypes.createArrayType( DataTypes.createStructType(addressFields));
      
          employeeFields.add(DataTypes.createStructField("addresses", addressStruct, true));
          StructType employeeSchema = DataTypes.createStructType(employeeFields);
      
          SparkSession sparkSession = SparkSession
                  .builder().appName(SaveToCSV.class.getName())
                  .master(master).getOrCreate();
      
          SparkContext context = sparkSession.sparkContext();
          context.setLogLevel("ERROR");
      
          SQLContext sqlCtx = sparkSession.sqlContext();
      
          Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
      
          Dataset<Employee>  rowDataset = sparkSession.read()
                  .option("inferSchema", "false")
                  .schema(employeeSchema)
                  .json("simple_employees.json").as(employeeEncoder);
      
          rowDataset.createOrReplaceTempView("employeeView");
      
          sqlCtx.sql("select * from employeeView").show();
      
          sparkSession.close();
      
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2022-10-16
        • 1970-01-01
        • 2019-11-26
        • 2021-02-02
        • 2017-06-21
        • 1970-01-01
        • 2020-10-02
        相关资源
        最近更新 更多