【问题标题】:SpringBoot Embedded Kafka to produce Event using Avro SchemaSpring Boot Embedded Kafka 使用 Avro Schema 生成事件
【发布时间】:2021-06-09 01:20:56
【问题描述】:

我创建了以下测试类来使用 AvroSerializer 生成事件。

@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
@TestPropertySource(locations = ("classpath:application-test.properties"))
@ContextConfiguration(classes = { TestAppConfig.class })
@DirtiesContext
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EntitlementEventsConsumerServiceImplTest {


    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Bean
    MockSchemaRegistryClient mockSchemaRegistryClient() {
        return new MockSchemaRegistryClient();
    }

    @Bean
    KafkaAvroSerializer kafkaAvroSerializer() {
        return new KafkaAvroSerializer(mockSchemaRegistryClient());
    }

    @Bean
    public DefaultKafkaProducerFactory producerFactory() {
        Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafkaBroker);
        props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
        return new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer());
    }

    @Bean
    public KafkaTemplate<String, ApplicationEvent> kafkaTemplate() {
        KafkaTemplate<String, ApplicationEvent> kafkaTemplate = new KafkaTemplate(producerFactory());
        return kafkaTemplate;
    }
}

但是当我使用kafkaTemplate().send(appEventsTopic, applicationEvent);发送事件时,我得到了以下异常。

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema Not Found; error code: 404001
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getIdFromRegistry(MockSchemaRegistryClient.java:79)
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getId(MockSchemaRegistryClient.java:273)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:781)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:562)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)

当我使用 MockSchemaRegistryClient 时,为什么它会尝试查找架构?

【问题讨论】:

  • 您使用的是什么版本的架构注册表依赖项?
  • 编辑了我的答案,看看

标签: java apache-kafka spring-kafka confluent-schema-registry spring-kafka-test


【解决方案1】:

您将生产者设置为不尝试在生成消息时自动注册新架构,因此它只是尝试从 SR 获取并且在 SR 上没有找到其架构。


也没有看到你设置架构注册表 URL 猜测它采用默认值


对于您的问题,mock 正在模仿真实模式注册表的工作,但有其明显的缺点

/**

  • 可用于测试的 SchemaRegistryClient 的模拟实现。这个版本不是
  • 线程安全。模式数据存储在内存中,不是持久的,也不是跨实例共享的。 */

您可以查看文档以获取更多信息

https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClient.java#L47

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-12-20
    • 2018-03-09
    • 2019-08-25
    • 2019-09-13
    • 1970-01-01
    • 2016-12-09
    • 2021-07-01
    • 2018-05-17
    相关资源
    最近更新 更多