【发布时间】:2021-08-09 00:08:48
【问题描述】:
在 Kafka 生产者中,我可以看到我们只需要指定模式注册表 url 而不是我想要的模式。因此,在记录序列化时,生产者如何决定使用哪个模式。因为架构注册表可以托管多个架构。
https://dzone.com/articles/kafka-avro-serialization-and-the-schema-registry
请参阅上面网址中给出的以下示例。在这里,我看不到架构 ID,而只有注册表 url。那么生产者如何找到正确的架构?
公共类 AvroProducer {
private static Producer<Long, Employee> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
LongSerializer.class.getName());
// Configure the KafkaAvroSerializer.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class.getName());
// Schema Registry location.
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:8081");
return new KafkaProducer<>(props);
}
private final static String TOPIC = "new-employees";
public static void main(String... args) {
Producer<Long, Employee> producer = createProducer();
Employee bob = Employee.newBuilder().setAge(35)
.setFirstName("Bob")
.setLastName("Jones")
.setPhoneNumber(
PhoneNumber.newBuilder()
.setAreaCode("301")
.setCountryCode("1")
.setPrefix("555")
.setNumber("1234")
.build())
.build();
IntStream.range(1, 100).forEach(index->{
producer.send(new ProducerRecord<>(TOPIC, 1L * index, bob));
});
producer.flush();
producer.close();
}
}
【问题讨论】:
-
你有示例代码吗?
标签: java apache-kafka avro confluent-schema-registry