【问题标题】:About how to create a custom org.apache.spark.sql.types.StructType schema object starting from a json file programmatically关于如何以编程方式从 json 文件开始创建自定义 org.apache.spark.sql.types.StructType 模式对象
【发布时间】:2016-11-14 22:13:21
【问题描述】:

我必须使用 json 文件中的信息创建自定义 org.apache.spark.sql.types.StructType 模式对象,json 文件可以是任何东西,所以我在属性文件中对其进行了参数化。

这是属性文件的外观:

//ruta al esquema del fichero output (por defecto se infiere el esquema del Parquet destino). Si existe, el esquema será en formato JSON, aplicable a DataFrame (ver StructType.fromJson)
schema.parquet=/Users/XXXX/Desktop/generated_schema.json
writing.mode=overwrite
separator=;
header=false

文件 generated_schema.json 看起来像:

{"type" : "struct","fields" : [ {"name" : "codigo","type" : "string","nullable" : true}, {"name":"otro", "type":"string", "nullable":true}, {"name":"vacio", "type":"string", "nullable":true},{"name":"final","type":"string","nullable":true} ]}

所以,这就是我认为我可以解决的方法:

val path: Path = new Path(mra_schema_parquet)
val fileSystem = path.getFileSystem(sc.hadoopConfiguration)
val inputStream: FSDataInputStream = fileSystem.open(path)
val schema_json = Stream.cons(inputStream.readLine(), Stream.continually( inputStream.readLine))

System.out.println("schema_json looks like "  + schema_json.head)

val mySchemaStructType :DataType = DataType.fromJson(schema_json.head)

/*
After this line, mySchemaStructType have four StructFields objects inside it, the same than appears at schema_json
*/
logger.info(mySchemaStructType)

val myStructType = new StructType()
myStructType.add("mySchemaStructType",mySchemaStructType)

/*

After this line, myStructType have zero StructFields! here must be the bug, myStructType should have the four StructFields that represents the loaded schema json! this must be the error! but how can i construct the necessary StructType object?

*/

myDF = loadCSV(sqlContext, path_input_csv,separator,myStructType,header)
System.out.println("myDF.schema.json looks like " + myDF.schema.json)
inputStream.close()

df.write
  .format("com.databricks.spark.csv")
  .option("header", header)
  .option("delimiter",delimiter)
  .option("nullValue","")
  .option("treatEmptyValuesAsNulls","true")
  .mode(saveMode)
  .parquet(pathParquet)

当代码运行最后一行 .parquet(pathParquet) 时,发生异常:

**parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message root {
}**

这段代码的输出是这样的:

16/11/11 13:57:04 INFO AnotherCSVtoParquet$: The job started using this propertie file: /Users/aisidoro/Desktop/mra-csv-converter/parametrizacion.properties
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: path_input_csv is /Users/aisidoro/Desktop/mra-csv-converter/cds_glcs.csv
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: path_output_parquet  is /Users/aisidoro/Desktop/output900000
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: mra_schema_parquet is /Users/aisidoro/Desktop/mra-csv-converter/generated_schema.json
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: writting_mode is overwrite
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: separator is ;
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: header is false
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: ATTENTION! aplying mra_schema_parquet  /Users/aisidoro/Desktop/mra-csv-converter/generated_schema.json
schema_json looks like {"type" : "struct","fields" : [ {"name" : "codigo","type" : "string","nullable" : true}, {"name":"otro", "type":"string", "nullable":true}, {"name":"vacio", "type":"string", "nullable":true},{"name":"final","type":"string","nullable":true} ]}
16/11/11 13:57:12 INFO AnotherCSVtoParquet$: StructType(StructField(codigo,StringType,true), StructField(otro,StringType,true), StructField(vacio,StringType,true), StructField(final,StringType,true))
 16/11/11 13:57:13 INFO AnotherCSVtoParquet$: loadCSV. header is false, inferSchema is false pathCSV is /Users/aisidoro/Desktop/mra-csv-converter/cds_glcs.csv separator is ;
 myDF.schema.json looks like {"type":"struct","fields":[]}

应该是 schema_json 对象和 myDF.schema.json 对象应该有相同的内容,不是吗?但它没有发生。我认为这必须启动错误。

最后这个工作被这个例外压垮了:

**parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message root {
}**

事实是,如果我不提供任何 json 架构文件,则作业执行良好,但使用此架构...

有人可以帮我吗?我只想从 csv 文件和 json 模式文件开始创建一些 parquet 文件。

谢谢。

依赖项是:

    <spark.version>1.5.0-cdh5.5.2</spark.version>
    <databricks.version>1.5.0</databricks.version>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>${databricks.version}</version>
    </dependency>

更新

我可以看到有一个未解决的问题,

https://github.com/databricks/spark-csv/issues/61

【问题讨论】:

    标签: scala spark-csv


    【解决方案1】:

    既然你说Custom Schema,你就可以这样做了。

    val schema = (new StructType).add("field1", StringType).add("field2", StringType)
    sqlContext.read.schema(schema).json("/json/file/path").show
    

    另外,请查看thisthis

    您可以创建嵌套的 JSON Schema,如下所示。

    例如:

    {
      "field1": {
        "field2": {
          "field3": "create",
          "field4": 1452121277
        }
      }
    }
    
    val schema = (new StructType)
      .add("field1", (new StructType)
        .add("field2", (new StructType)
          .add("field3", StringType)
          .add("field4", LongType)
        )
      )
    

    【讨论】:

    • 谢谢 Shankar,使用这种方法,我必须读取带有架构的子文件才能创建架构,但是,如果结构不平坦怎么办?
    • @aironman:更新了我的答案,您可以像链接中提供的那样创建嵌套的 json 架构。
    • 谢谢Shankar,我真的很感激,但我只知道嵌套的json模式文件会随着时间的推移而不同,所以我不知道它会如何发展,模式是动态的。 ¿ 在我解析 json 模式对象文件时,有没有更好的方法来动态创建此模式对象?
    • @aironman:我没有尝试读取没有模式的嵌套 json,但它应该可以工作,我的意思是val df =sqlContext.read.json("/json/file/path"),你不需要传递模式,但它仍然返回 DataFrame。跨度>
    【解决方案2】:

    我终于找到了问题所在。

    问题出在以下几行:

    val myStructType = new StructType()
    myStructType.add("mySchemaStructType",mySchemaStructType)
    

    我必须使用这条线:

    val mySchemaStructType = DataType.fromJson(schema_json.head).asInstanceOf[StructType]
    

    我必须从 DataType 转换 StructType 才能使事情正常进行。

    【讨论】:

      猜你喜欢
      • 2018-11-29
      • 1970-01-01
      • 2021-11-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-03-21
      • 2013-08-02
      相关资源
      最近更新 更多