【问题标题】:How Kafka producer find the schema Id for a recordKafka 生产者如何找到记录的模式 ID
【发布时间】:2021-08-09 00:08:48
【问题描述】:

在 Kafka 生产者中,我可以看到我们只需要指定模式注册表 url 而不是我想要的模式。因此,在记录序列化时,生产者如何决定使用哪个模式。因为架构注册表可以托管多个架构。

https://dzone.com/articles/kafka-avro-serialization-and-the-schema-registry

请参阅上面网址中给出的以下示例。在这里,我看不到架构 ID,而只有注册表 url。那么生产者如何找到正确的架构?

公共类 AvroProducer {

private static Producer<Long, Employee> createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            LongSerializer.class.getName());

    // Configure the KafkaAvroSerializer.
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            KafkaAvroSerializer.class.getName());

    // Schema Registry location.
    props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://localhost:8081");

    return new KafkaProducer<>(props);
}

private final static String TOPIC = "new-employees";

public static void main(String... args) {

    Producer<Long, Employee> producer = createProducer();

    Employee bob = Employee.newBuilder().setAge(35)
            .setFirstName("Bob")
            .setLastName("Jones")
            .setPhoneNumber(
                    PhoneNumber.newBuilder()
                            .setAreaCode("301")
                            .setCountryCode("1")
                            .setPrefix("555")
                            .setNumber("1234")
                            .build())
            .build();

    IntStream.range(1, 100).forEach(index->{
        producer.send(new ProducerRecord<>(TOPIC, 1L * index, bob));

    });

    producer.flush();
    producer.close();
}

}

【问题讨论】:

  • 你有示例代码吗?

标签: java apache-kafka avro confluent-schema-registry


【解决方案1】:

您发送的 POJO 类被序列化以包含架构,该架构在二进制数据负载发送到 Kafka 之前以纯文本形式发送到注册表。

当在服务器端接收到注册表 HTTP 请求时,它使用 MD5 哈希对一些内部哈希映射来检查/插入模式文本哈希以及主题/Avro 记录(注册表“主题”)名称,然后返回在 HTTP 响应中将数字 ID 返回给序列化程序以创建 Kafka 消息负载。

如果您想了解更多信息,Schema Registry 代码是开源的

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-10-29
    • 2016-06-16
    • 1970-01-01
    • 2019-03-31
    • 2021-04-19
    • 1970-01-01
    • 1970-01-01
    • 2021-04-25
    相关资源
    最近更新 更多