【发布时间】:2019-10-14 00:46:17
【问题描述】:
我正在尝试读取以二进制 (Base64) 编码并快速压缩的 avro 文件 avro 文件上的 Hadoop 猫看起来像:
Objavro.schema?
{"type":"record","name":"ConnectDefault","namespace":"xyz.connect.avro","fields":
[{"name":"service","type":"string"},{"name":"timestamp","type":"long"},
{"name":"count","type":"int"},{"name":"encoderKey","type":{"type":"map","values":"string"}},
{"name":"schema","type":"string"},{"name":"data","type":"string"}]}>??n]
我需要从上述文件中提取并读取“模式”和“数据”。 “模式”与具有多个字段的“数据”相关联
我尝试了以下步骤:
1.读取二进制文件
val binaryFilesRDD = sc.binaryFiles("file+0+00724+00731.avro").map { x => ( x._2.toArray) }
binaryFilesRDD: org.apache.spark.rdd.RDD[Array[Byte]] = MapPartitionsRDD[1] at map at
<console>:24
- 将 RDD[Array[Byte]] 转换为 Array[Byte]
scala> val newArray = binaryFilesRDD.collect().flatten
newArray: Array[Byte] = Array(17, 18, 16, 51, 24, 22, 17, 18, 117, 151, 76, 105, 95, 124....
- 使用newArray(即Array[Byte])调用以下方法从Bytes中获取Records
def getGenericRecordfromByte(inputData:Array[Byte], inputDataSchema: Schema): GenericRecord =
{
val datareader = new GenericDatumReader[GenericRecord](inputDataSchema)
val datadecoder = DecoderFactory.get.binaryDecoder(inputData, null)
datareader.read(null, datadecoder)
}
但我收到以下错误。
scala> val newDataRecords = getGenericRecordfromByte(newArray,inputDataSchema)
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
请指教
【问题讨论】:
标签: scala apache-spark binary decoder spark-avro