【问题标题】:Avro Schema to spark StructType用于激发 StructType 的 Avro Schema
【发布时间】:2016-02-27 05:37:15
【问题描述】:

这实际上与我的previous question 相同,但使用 Avro 而不是 JSON 作为数据格式。

我正在使用一个 Spark 数据框,它可以从几个不同的架构版本之一加载数据:

// Version One
{"namespace": "com.example.avro",
 "type": "record",
 "name": "MeObject",
 "fields": [
     {"name": "A", "type": ["null", "int"], "default": null}
 ]
}

// Version Two
{"namespace": "com.example.avro",
 "type": "record",
 "name": "MeObject",
 "fields": [
     {"name": "A", "type": ["null", "int"], "default": null},
     {"name": "B", "type": ["null", "int"], "default": null}
 ]
}

我正在使用Spark Avro 加载数据。

DataFrame df = context.read()
  .format("com.databricks.spark.avro")
  .load("path/to/avro/file");

可能是版本一文件或版本二文件。但是,我希望能够以相同的方式处理它,将未知值设置为“null”。我上一个问题中的建议是设置架构,但是我不想重复自己在.avro 文件和火花StructType 和朋友中编写架构。如何将 avro 架构(文本文件或生成的 MeObject.getClassSchema())转换为 sparks StructType

Spark Avro 有一个 SchemaConverters,但它是私有的,并返回一些奇怪的内部对象。

【问题讨论】:

标签: java apache-spark apache-spark-sql avro


【解决方案1】:

免责声明:这是一种肮脏的黑客行为。这取决于几件事:

  • Python 提供了lightweight Avro processing library,并且由于它的动态性,它不需要类型化的编写器
  • 空的 Avro 文件仍然是有效的文档
  • Spark 架构可以与 JSON 相互转换

以下代码读取 Avro 架构文件,创建具有给定架构的空 Avro 文件,使用 spark-csv 读取它并将 Spark 架构输出为 JSON 文件。

import argparse
import tempfile

import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

from pyspark import SparkContext
from pyspark.sql import SQLContext

def parse_schema(schema):
    with open(schema) as fr:
        return avro.schema.parse(open(schema).read())

def write_dummy(schema):
    tmp = tempfile.mktemp(suffix='.avro')
    with open(tmp, "w") as fw:
        writer = DataFileWriter(fw, DatumWriter(), schema)
        writer.close()
    return tmp

def write_spark_schema(path, schema):
    with open(path, 'w') as fw:
        fw.write(schema.json())


def main():
    parser = argparse.ArgumentParser(description='Avro schema converter')
    parser.add_argument('--schema')
    parser.add_argument('--output')
    args = parser.parse_args()

    sc = SparkContext('local[1]', 'Avro schema converter')
    sqlContext = SQLContext(sc)

    df = (sqlContext.read.format('com.databricks.spark.avro')
            .load(write_dummy(parse_schema(args.schema))))

    write_spark_schema(args.output, df.schema)
    sc.stop()


if __name__ == '__main__':
    main()

用法:

bin/spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 \ 
   avro_to_spark_schema.py \
   --schema path_to_avro_schema.avsc \
   --output path_to_spark_schema.json

读取架构:

import scala.io.Source
import org.apache.spark.sql.types.{DataType, StructType}

val json: String = Source.fromFile("schema.json").getLines.toList.head
val schema: StructType = DataType.fromJson(json).asInstanceOf[StructType]

【讨论】:

    【解决方案2】:

    请看看这是否有帮助,虽然有点晚了。我正在为我目前的工作努力。我使用了 Databricks 的 schemaconverter。我想,您正在尝试使用给定架构读取 avro 文件。

     val schemaObj = new Schema.Parser().parse(new File(avscfilepath));
     var sparkSchema : StructType = new StructType
     import scala.collection.JavaConversions._     
     for(field <- schemaObj.getFields()){
      sparkSchema = sparkSchema.add(field.name, SchemaConverters.toSqlType(field.schema).dataType)
     }
     sparkSchema
    

    【讨论】:

      猜你喜欢
      • 2019-05-05
      • 2017-04-08
      • 1970-01-01
      • 2019-12-21
      • 1970-01-01
      • 2019-12-17
      • 1970-01-01
      • 1970-01-01
      • 2020-07-18
      相关资源
      最近更新 更多