【发布时间】:2018-10-11 20:14:31
【问题描述】:
我当前的设置有以下配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> myKafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setMessageConverter(stringJsonMessageConverter());
return factory;
}
stringJsonMessageConverter 在哪里
@Bean
public StringJsonMessageConverter stringJsonMessageConverter() {
return new StringJsonMessageConverter(objectMapper());
}
使用我的对象映射器
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(myCustomJacksonModules())
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(ACCEPT_SINGLE_VALUE_AS_ARRAY, true)
.configure(WRITE_DATES_AS_TIMESTAMPS, false);
}
有了这个配置,我可以发布为:
...
headers = new MessageHeaders(singletonMap(TOPIC, topic));
Foo foo = ....
Message<?> message = new GenericMessage<>(foo, headers);
kafkaTemplate.send(message);
消费为:
@KafkaListener(topics = "myTopic",
groupId = "g1",
containerFactory = "myKafkaListenerContainerFactory")
public void onMessageReceived(Foo foo) {
... works with foo here
}
如果 Foo 是具体类,则在此示例中可以正常工作。但是,如果 Foo 是抽象的,那么发送的内容是 消息是它的子类
F1 extends Foo; F2 extends Foo ...
... // publisher
headers = new MessageHeaders(singletonMap(TOPIC, topic));
F1 f1 = ....
Message<?> message = new GenericMessage<>(f1, headers);
kafkaTemplate.send(message);
... // Listener
@KafkaListener(topics = "myTopic",
groupId = "g1",
containerFactory = "myKafkaListenerContainerFactory")
public void onMessageReceived(Foo foo) {
... wont work
}
在我的消费者中将 Foo 声明为类型会失败,但这会起作用:
@KafkaListener(topics = "myTopic",
groupId = "g1",
containerFactory = "myKafkaListenerContainerFactory")
public void onMessageReceived(Map<String,Object> fooAsMap) {
... this works too
}
除了上面提到的使用地图的选项外,我还尝试了以下方法:
- class level kafka listener ([from here)][1]; it still requires method with param of type Map
- custom deserializer registered in object mapper?
有没有办法在我发送消息时指定目标类型?
【问题讨论】:
-
按目标类型是指要发送的任何特定 POJO 对象吗?
-
是的。在我的示例中,类型为 F1.. 目标是在从 kafka 到对象的转换期间,即使侦听器被声明为 Foo,它也会知道它是 F1
标签: java apache-kafka spring-kafka