【问题标题】:How to write Avro files from Apache Beam Row如何从 Apache Beam Row 写入 Avro 文件
【发布时间】:2021-04-09 02:17:07
【问题描述】:

在我的 Apache Beam 管道中,我有一个 Row 对象的 PCollection (org.apache.beam.sdk.values.Row)。我想写入 Avro 文件。这是我的代码的简化版本:

   Pipeline p = Pipeline.create();

    Schema inputSchema = Schema.of(
            Schema.Field.of("MyField1", Schema.FieldType.INT32)
    );

    Row row = Row.withSchema(inputSchema).addValues(1).build();
    PCollection<Row> myRow = p.apply(Create.of(row)).setCoder(RowCoder.of(inputSchema));

    myRow.apply(
            "WriteToAvro",
            AvroIO.write(Row.class)
                    .to("src/tmp/my_files")
                    .withWindowedWrites()
                    .withNumShards(10));
    p.run();

文件已创建,但看起来像这样(JSON 格式):

"schema" : {
    "fieldIndices" : {
        "MyField1" : 0
    },
    "encodingPositions" : {
        "MyField1" : 0
    },
    "fields" : [
        {
        }
    ],
    "hashCode" : 545134948,
    "uuid" : {
    },
    "options" : {
        "options" : {
        }
    }
}

所以只有模式存在一堆无用的元数据。从 Row 对象写入 Avro 的正确方法是什么,以便我拥有数据而不仅仅是架构。我可以摆脱元数据吗?

【问题讨论】:

  • 你用什么来检查文件内容?
  • @OneCricketeer 我正在使用在线查看器; dataformat.net/avro/viewer-and-converter
  • 我建议使用 avro-tools jar 文件,因为不清楚该站点实际在做什么
  • 刚刚试了一下。获取完全相同的 JSON :/

标签: apache-beam avro apache-beam-io


【解决方案1】:

AvroUtils 有一些实用程序可以将 Rows 和 Beam 模式转换为 Avro 类型。你可以这样做:

 Pipeline p = Pipeline.create();

    Schema inputSchema = Schema.of(
            Schema.Field.of("MyField1", Schema.FieldType.INT32)
    );
    avro.Schema avroSchema = AvroUtils.toAvroSchema(inputSchema)

    class ConvertToGenericRecords extends DoFn<Row, GenericRecord> {
      @ProcessElement
      public void process(ProcessContext<Row> c) {
        c.output(AvroUtils.toGenericRecord(c.element(), avroSchema));
      }
    }
    

    Row row = Row.withSchema(inputSchema).addValues(1).build();
    PCollection<Row> myRow = p.apply(Create.of(row)).setCoder(RowCoder.of(inputSchema));

    myRow.apply(ParDo.of(new ConvertToGenericRecords()))
         .withCoder(AvroCoder.of(avroSchema)
         .apply(
            "WriteToAvro",
            AvroIO.writeGenericRecords(avroSchema)
                    .to("src/tmp/my_files")
                    .withWindowedWrites()
                    .withNumShards(10));
    p.run();

【讨论】:

  • 感谢@Pablo。我试了一下,得到以下错误: java.lang.IllegalArgumentException: 无法序列化 DoFnWithExecutionInformation{doFn=PipelineTest$1ConvertToGenericRecords@7134b8a7, mainOutputTag=Tag, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}在 org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:59) ---看起来它无法序列化某些东西。关于它可能是什么以及如何解决它的任何想法?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-08-11
  • 2017-09-03
  • 1970-01-01
  • 2018-11-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多