【问题标题】:Read parquet data from ByteArrayOutputStream instead of file从 ByteArrayOutputStream 而不是文件读取镶木地板数据
【发布时间】:2020-01-28 04:12:53
【问题描述】:

我想转换这段代码:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


public class ParquetReaderUtils {

    public static Parquet getParquetData(String filePath) throws IOException {
        List<SimpleGroup> simpleGroups = new ArrayList<>();
        ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(filePath), new Configuration()));
        MessageType schema = reader.getFooter().getFileMetaData().getSchema();
        //List<Type> fields = schema.getFields();
        PageReadStore pages;
        while ((pages = reader.readNextRowGroup()) != null) {
            long rows = pages.getRowCount();
            MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
            RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));

            for (int i = 0; i < rows; i++) {
                SimpleGroup simpleGroup = (SimpleGroup) recordReader.read();
                simpleGroups.add(simpleGroup);
            }
        }
        reader.close();
        return new Parquet(simpleGroups, schema);
    }
}

(来自https://www.arm64.ca/post/reading-parquet-files-java/

采用 ByteArrayOutputStream 参数而不是 filePath。

这可能吗?我在 org.apache.parquet.hadoop 中没有看到 ParquetStreamReader。

感谢任何帮助。我正在尝试为来自 kafka 的镶木地板编写一个测试应用程序,并且将许多消息中的每一条都写入文件相当慢。

【问题讨论】:

  • 只需实现 org.apache.parquet.io.InputFile 接口,就像 org.apache.parquet.hadoop.util.HadoopInputFile 一样。您唯一需要做的就是从输出流中创建一个字节数组,从中创建一个字节数组输入流并将其传递给 org.apache.parquet.io.DelegatingSeekableInputStream
  • 感谢您的提示,@m4gic。赏金到期后我会试试这个。

标签: java parquet bytearrayoutputstream


【解决方案1】:

所以如果没有更深入的测试,我会尝试使用这个类(尽管输出流的内容应该是 parquet 兼容的)。我放了一个streamId,以便更容易识别已处理的字节数组(如果出现问题,ParquetFileReader 会打印 instance.toString())。

public class ParquetStream implements InputFile {
    private final String streamId;
    private final byte[] data;

    private static class SeekableByteArrayInputStream extends ByteArrayInputStream {
        public SeekableByteArrayInputStream(byte[] buf) {
            super(buf);
        }

        public void setPos(int pos) {
            this.pos = pos;
        }

        public int getPos() {
            return this.pos;
        }
    }

    public ParquetStream(String streamId, ByteArrayOutputStream stream) {
        this.streamId = streamId;
        this.data = stream.toByteArray();
    }

    @Override
    public long getLength() throws IOException {
        return this.data.length;
    }

    @Override
    public SeekableInputStream newStream() throws IOException {
        return new DelegatingSeekableInputStream(new SeekableByteArrayInputStream(this.data)) {
            @Override
            public void seek(long newPos) throws IOException {
                ((SeekableByteArrayInputStream) this.getStream()).setPos((int) newPos);
            }

            @Override
            public long getPos() throws IOException {
                return ((SeekableByteArrayInputStream) this.getStream()).getPos();
            }
        };
    }

    @Override
    public String toString() {
        return "ParquetStream[" + streamId + "]";
    }
}

【讨论】:

  • 我成功地使用了这种方法。对于我自己的实现,我还创建了一个ByteArraySeekableInputStream 类,它将SeekableByteArrayInputStream 委托存储在一个字段中,而不是声明一个匿名内部类。这避免了((SeekableByteArrayInputStream) this.getStream()) 演员表。
  • 非常好的替代巨大的 hadoop-aws 依赖,谢谢。 +1
猜你喜欢
  • 2022-06-16
  • 1970-01-01
  • 2019-08-04
  • 2019-09-23
  • 2020-02-01
  • 2017-01-22
  • 2021-11-10
  • 2018-05-06
  • 2017-12-27
相关资源
最近更新 更多