【发布时间】:2018-12-12 00:06:22
【问题描述】:
我必须从 Cloud Storage 读取一个 AVRO 文件,然后将记录写入一个带有行键的大表中,并将 AVRO 作为列单元格中的字节。我正在使用 AVROIO.read 将数据读取为 GenericRecord 。 . 如何应用 pardo 函数将数据转换为可以写入 bigtable 的内容
// Read AVRO from GCS
pipeline
.apply("Read from Avro",
AvroIO
.readGenericRecords(schema)
.from(options.getInputFilePattern()))
//.apply - pardo transformation
.apply("Write to Bigtable", write);
非常感谢您对管道第二步的任何帮助
更新:
感谢 Anton 的快速帮助,我现在明白我必须做什么,并想出了下面的 pardo
pipeline
.apply("Read from Avro",
AvroIO
.readGenericRecords(schema)
.from(options.getInputFilePattern()))
.apply(ParDo.of(new DoFn<GenericRecord, Iterable<Mutation> >() {
@ProcessElement
public void processElement(ProcessContext c) {
GenericRecord gen = c.element();
byte[] fieldNameByte = null;
byte[] fieldValueByte = null;
// ImmutableList.Builder<Mutation> mutations = ImmutableList.builder();
for (Schema.Field field : fields) {
try {
String fieldName = field.name();
fieldNameByte = fieldName.getBytes("UTF-8");
String value = String.valueOf(gen.get(fieldName));
fieldValueByte = value.getBytes("UTF-8");
} catch (Exception e) {
e.printStackTrace();
}
Iterable<Mutation> mutations =
ImmutableList.of(
Mutation.newBuilder()
.setSetCell(
Mutation.SetCell.newBuilder()
.setValue(
ByteString.copyFrom(fieldValueByte))
.setFamilyName(COLUMN_FAMILY_NAME))
.build());
c.output(,mutations));
}
}
}))
.apply("Write to Bigtable", write);
return pipeline.run();
这只是一个伪代码,我只是在学习和尝试.. 我需要帮助来将突变添加到 ProcessContext 并进行写入.. 请看一下,让我知道我是否在正确的方向以及如何将突变添加到上下文中
【问题讨论】:
标签: google-cloud-dataflow apache-beam bigtable