【问题标题】:How to read Avro Binary(Base64) Encoded data in Spark Scala如何在 Spark Scala 中读取 Avro 二进制(Base64)编码数据
【发布时间】:2019-10-14 00:46:17
【问题描述】:

我正在尝试读取以二进制 (Base64) 编码并快速压缩的 avro 文件 avro 文件上的 Hadoop 猫看起来像:

Objavro.schema? 
{"type":"record","name":"ConnectDefault","namespace":"xyz.connect.avro","fields": 
[{"name":"service","type":"string"},{"name":"timestamp","type":"long"}, 
{"name":"count","type":"int"},{"name":"encoderKey","type":{"type":"map","values":"string"}}, 
{"name":"schema","type":"string"},{"name":"data","type":"string"}]}>??n]

我需要从上述文件中提取并读取“模式”和“数据”。 “模式”与具有多个字段的“数据”相关联

我尝试了以下步骤:

1.读取二进制文件

val binaryFilesRDD = sc.binaryFiles("file+0+00724+00731.avro").map { x => ( x._2.toArray) }
binaryFilesRDD: org.apache.spark.rdd.RDD[Array[Byte]] = MapPartitionsRDD[1] at map at 
<console>:24
  1. 将 RDD[Array[Byte]] 转换为 Array[Byte]
scala> val newArray = binaryFilesRDD.collect().flatten 
    newArray: Array[Byte] = Array(17, 18, 16, 51, 24, 22, 17, 18, 117, 151, 76, 105, 95, 124....
  1. 使用newArray(即Array[Byte])调用以下方法从Bytes中获取Records
    def getGenericRecordfromByte(inputData:Array[Byte], inputDataSchema: Schema): GenericRecord = 
    {
        val datareader = new GenericDatumReader[GenericRecord](inputDataSchema)
        val datadecoder = DecoderFactory.get.binaryDecoder(inputData, null)
        datareader.read(null, datadecoder) 
    }

但我收到以下错误。

    scala> val newDataRecords = getGenericRecordfromByte(newArray,inputDataSchema)
    org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
  at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
  at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
  at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
  at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
  at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
  at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
  at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)


请指教

【问题讨论】:

    标签: scala apache-spark binary decoder spark-avro


    【解决方案1】:

    你像这样启动 spark-shell:

    spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.4
    

    或者可能是这样的:

    spark2-shell --packages org.apache.spark:spark-avro_2.11:2.4.4
    

    然后你做:

    spark.read.format("com.databricks.spark.avro").load("/file/path")
    

    【讨论】:

      【解决方案2】:

      对于 2.3.x 和更早的版本 (https://github.com/databricks/spark-avro/blob/branch-4.0/README-for-old-spark-versions.md):

      spark-shell --packages com.databricks:spark-avro_2.11:4.0.0 ...
      

      然后,在您的代码中:

      val avro = spark.read.format("com.databricks.spark.avro").load(/path/)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-12-01
        • 2011-11-02
        • 2015-03-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-06-27
        • 2014-12-29
        相关资源
        最近更新 更多