【问题标题】:Is there a better way to grab a list of Avro records from an InputStream?有没有更好的方法从 InputStream 中获取 Avro 记录列表?
【发布时间】:2019-03-28 05:46:23
【问题描述】:

我有一个ByteArrayInputStream,它是用一个List<TestAvroModel> 序列化的,它是SpecificRecord 的一个实现。我找不到让 Avro 了解已序列化列表的方法,所以我采用了一种骇人听闻的方式来遍历 ByteArrayInputStream


//TestAvroModel is an implementation of SpecificRecord
List<TestAvroModel> models;
ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
for(TestAvroModel model: models) {
    DatumWriter<SpecificRecord> writer = new SpecificDatumWriter<>(model.getSchema());
    Encoder encoder = new EncoderFactory().binaryEncoder(byteArrayStream, null);

    writer.write(model, encoder);
    encoder.flush();
}

//This was pre-serialized with a List of TestAvroModel
ByteArrayInputStream inputStream;

DatumReader<TestAvroModel> reader = new SpecificDatumReader<>(TestAvroModel.getClassSchema());
Decoder decoder = DecoderFactory().get().binaryDecoder(inputStream, null);

List<TestAvroModel> records = new ArrayList<>();
boolean eof = false;
while(!eof) {
    try {
        records.add(reader.read(null, decoder));
    catch(EOFException ex) {
        eof = true;
    }
}

这种方式有效并一次读取序列化的List&lt;TestAvroModel&gt; 并将其添加到我的记录列表中。虽然循环通过DatumReader 直到EOFException 似乎不是最好的方法,但我还没有找到更好的方法。

我在 Avro 库中找不到任何处理 InputStream 的内容,其中包含多个 Avro 记录。尽管 Avro 必须在流中具有断点才能像我上面那样读取单个记录。重申一下,有没有人知道循环通过DatumReader 然后上面显示的方式更好的方法?

【问题讨论】:

  • 我很好奇你是怎么序列化你的数据的,你能和我们分享一下吗?如何使用记录列表动态创建模式来处理此问题?
  • @hlagos 我已经用一段关于如何序列化测试数据的代码更新了我的问题。另外,我对您为记录列表动态创建架构的意思感到困惑。

标签: java avro bytearrayinputstream


【解决方案1】:

Decoder 似乎为此定义了isEnd(): Boolean

如果当前 BinaryDecoder 位于其源的末尾,则返回 true 如果不抛出 EOFException 或 其他 IOException。

这应该可行:

...
while(!decoder.isEnd()) {
  records.add(reader.read(null, decoder));
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-05-14
    • 1970-01-01
    • 2021-07-05
    • 2020-09-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多