【问题标题】:Avro and Kafka by making use of SchemaBuilderAvro 和 Kafka 通过使用 SchemaBuilder
【发布时间】:2019-06-09 07:25:35
【问题描述】:

我通过baeldung 完成了教程。他们提到有两种方法可以创建模式。

  • 通过编写json表示并添加maven插件来生成类
  • 通过使用SchemaBuilder,他们还提到这是一个更好的选择。

不幸的是,在 git 示例中,我只看到了 json 方式。

假设我有这个 Avro 架构:

{
  "type":"record",
  "name":"TestFile",
  "namespace":"com.example.kafka.data.ingestion.model",
  "fields":[
    {
      "name":"date",
      "type":"long"
    },
    {
      "name":"counter",
      "type":"int"
    },
    {
      "name":"mc",
      "type":"string"
    }
  ]
}

通过在我的 pom 文件中添加这个插件:

<plugin>
   <groupId>org.apache.avro</groupId>
   <artifactId>avro-maven-plugin</artifactId>
   <version>1.8.0</version>
   <executions>
      <execution>
         <id>schemas</id>
         <phase>generate-sources</phase>
         <goals>
            <goal>schema</goal>
            <goal>protocol</goal>
            <goal>idl-protocol</goal>
         </goals>
         <configuration>
            <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
         </configuration>
      </execution>
   </executions>
</plugin>

并使用 generate-sources 构建 TestFile.java 到我说的目的地。 然后,为了发送到 kafka 主题,我可以执行以下操作:

TestFile test = TestFile.newBuilder()
                                            .setDate(102928374747)
                                            .setCounter(2)
                                            .setMc("Some string")
                                            .build();
kafkaTemplate.send(topicName, test);

使用SchemaBuilder 创建架构的等价物是:

Schema testFileSchema = SchemaBuilder   .record("TestFile")
                                            .namespace("com.example.kafka.data.ingestion.model")
                                            .fields()
                                            .requiredLong("date")
                                            .requiredInt("counter")
                                            .requiredString("mc")
                                            .endRecord();

但是我现在如何生成 POJO 并将我的 TestFile 数据发送到我的 kafka 主题?

【问题讨论】:

    标签: java apache-kafka avro


    【解决方案1】:

    您将无法访问 TestFile 对象,因为 Schema 是在运行时创建的,而不是预编译的。如果你想保留那个 POJO,那么你需要一个 public TestFile(GenericRecord avroRecord) 的构造函数

    您需要使用该Schema 对象创建一个GenericRecord,就像您从字符串或文件中解析它一样。

    例如,

    Schema schema = SchemaBuilder.record("TestFile")
                .namespace("com.example.kafka.data.ingestion.model")
                .fields()
                .requiredLong("date")
                .requiredInt("counter")
                .requiredString("mc")
                .endRecord();
    
    GenericRecord entry1 = new GenericData.Record(schema);
    entry1.put("date", 1L);
    entry1.put("counter", 2);
    entry1.put("mc", "3");
    
    // producer.send(new ProducerRecord<>(topic, entry1);
    

    完整的 Kafka 示例是 available from Confluent

    如果你放不包含必填字段,它会抛出一个错误,并且类型的值不会被检查(我可以放"counter", "2",它会发送一个字符串值(这似乎是对我来说是一个错误)。基本上,GenericRecord == HashMap&lt;String, Object&gt; 具有必需/可空字段的额外好处。

    你需要配置一个 Avro 序列化器,比如 Confluent 的,它需要运行他们的 Schema Registry,或者像 Cloudera shows 这样的版本

    否则,您需要将 Avro 对象转换为 byte[](如您的链接所示,只需使用 ByteArraySerializer

    【讨论】:

      【解决方案2】:

      如 Baeldung 教程中所述:

      稍后我们可以应用 toString 方法获取 JSON 结构 架构。

      因此,例如在主类中使用此代码,您可以将两个模式定义打印到控制台输出。

      然后您可以将生成的 json 表示保存到 .avsc 文件并像以前一样生成 pojo。

          Schema clientIdentifier = SchemaBuilder.record("ClientIdentifier")
                  .namespace("com.baeldung.avro")
                  .fields().requiredString("hostName").requiredString("ipAddress")
                  .endRecord();
          System.out.println(clientIdentifier.toString());
      
          Schema avroHttpRequest = SchemaBuilder.record("AvroHttpRequest")
                  .namespace("com.baeldung.avro")
                  .fields().requiredLong("requestTime")
                  .name("clientIdentifier")
                  .type(clientIdentifier)
                  .noDefault()
                  .name("employeeNames")
                  .type()
                  .array()
                  .items()
                  .stringType()
                  .arrayDefault(new ArrayList<>())
                  .name("active")
                  .type()
                  .enumeration("Active")
                  .symbols("YES","NO")
                  .noDefault()
                  .endRecord();
          System.out.println(avroHttpRequest.toString());
      

      还有第三种方法可以使用Avro IDL 生成 Avro 模式

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-12-18
        • 1970-01-01
        • 2020-10-12
        • 1970-01-01
        • 2019-12-27
        • 1970-01-01
        相关资源
        最近更新 更多