【发布时间】:2018-12-27 23:07:18
【问题描述】:
我正在尝试为自定义流处理器编写单元测试,但无法序列化我需要发送以进行测试的消息。我通过 kafka 跟随这个例子:https://kafka.apache.org/11/documentation/streams/developer-guide/testing.html。我在流中为自定义类(自动生成的 avro 类)使用 SpecificAvroSerde,但我无法在测试中使用 MockSchemaRegistryClient() 对其进行配置,我只能指向 SR 的 URL。
Serde<MyCustomObject> valueSerde = new SpecificAvroSerde<>();
Map<String, String> valueSerdeConfig = new HashMap<>();
valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake");
valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "true");
valueSerde.configure(valueSerdeConfig, false);
ConsumerRecordFactory<Long, MyCustomObject> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), valueSerde.serializer());
使用 KafkaAvroSerializer 我可以像这样初始化它:
KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);
但 ConsumerRecordFactory 不会将 KafkaAvroSerializer 作为参数。
是否有任何替代方法或我不知道这样做的方法?
如果有任何帮助,我将不胜感激,谢谢。
【问题讨论】:
标签: apache-kafka avro apache-kafka-streams confluent-schema-registry