【发布时间】:2020-01-28 04:12:53
【问题描述】:
我想转换这段代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ParquetReaderUtils {
public static Parquet getParquetData(String filePath) throws IOException {
List<SimpleGroup> simpleGroups = new ArrayList<>();
ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(filePath), new Configuration()));
MessageType schema = reader.getFooter().getFileMetaData().getSchema();
//List<Type> fields = schema.getFields();
PageReadStore pages;
while ((pages = reader.readNextRowGroup()) != null) {
long rows = pages.getRowCount();
MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
for (int i = 0; i < rows; i++) {
SimpleGroup simpleGroup = (SimpleGroup) recordReader.read();
simpleGroups.add(simpleGroup);
}
}
reader.close();
return new Parquet(simpleGroups, schema);
}
}
(来自https://www.arm64.ca/post/reading-parquet-files-java/)
采用 ByteArrayOutputStream 参数而不是 filePath。
这可能吗?我在 org.apache.parquet.hadoop 中没有看到 ParquetStreamReader。
感谢任何帮助。我正在尝试为来自 kafka 的镶木地板编写一个测试应用程序,并且将许多消息中的每一条都写入文件相当慢。
【问题讨论】:
-
只需实现 org.apache.parquet.io.InputFile 接口,就像 org.apache.parquet.hadoop.util.HadoopInputFile 一样。您唯一需要做的就是从输出流中创建一个字节数组,从中创建一个字节数组输入流并将其传递给 org.apache.parquet.io.DelegatingSeekableInputStream
-
感谢您的提示,@m4gic。赏金到期后我会试试这个。
标签: java parquet bytearrayoutputstream