【问题标题】:How to specify class type when using spring kafka template?使用spring kafka模板时如何指定类类型?
【发布时间】:2018-10-11 20:14:31
【问题描述】:

我当前的设置有以下配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> myKafkaListenerContainerFactory(
        ConsumerFactory<String, String> consumerFactory) {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new
            ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setMessageConverter(stringJsonMessageConverter());

    return factory;
}

stringJsonMessageConverter 在哪里

@Bean
public StringJsonMessageConverter stringJsonMessageConverter() {
    return new StringJsonMessageConverter(objectMapper());
}

使用我的对象映射器

@Bean
public ObjectMapper objectMapper() {
    return new ObjectMapper()
            .registerModule(new JavaTimeModule())
            .registerModule(myCustomJacksonModules())
            .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
            .configure(ACCEPT_SINGLE_VALUE_AS_ARRAY, true)
            .configure(WRITE_DATES_AS_TIMESTAMPS, false);
}

有了这个配置,我可以发布为:

...
headers = new MessageHeaders(singletonMap(TOPIC, topic));
Foo foo = ....
Message<?> message = new GenericMessage<>(foo, headers);
kafkaTemplate.send(message);

消费为:

@KafkaListener(topics = "myTopic",
        groupId = "g1",
        containerFactory = "myKafkaListenerContainerFactory")
public void onMessageReceived(Foo foo) {
    ... works with foo here
}

如果 Foo 是具体类,则在此示例中可以正常工作。但是,如果 Foo 是抽象的,那么发送的内容是 消息是它的子类

F1 extends Foo; F2 extends Foo ...

... // publisher

headers = new MessageHeaders(singletonMap(TOPIC, topic));
F1 f1 = ....
Message<?> message = new GenericMessage<>(f1, headers);
kafkaTemplate.send(message);

... // Listener

@KafkaListener(topics = "myTopic",
        groupId = "g1",
        containerFactory = "myKafkaListenerContainerFactory")
public void onMessageReceived(Foo foo) {
    ... wont work
}

在我的消费者中将 Foo 声明为类型会失败,但这会起作用:

@KafkaListener(topics = "myTopic",
        groupId = "g1",
        containerFactory = "myKafkaListenerContainerFactory")
public void onMessageReceived(Map<String,Object> fooAsMap) {
    ... this works too
}

除了上面提到的使用地图的选项外,我还尝试了以下方法:

- class level kafka listener ([from here)][1]; it still requires method with param of type Map
- custom deserializer registered in object mapper?

有没有办法在我发送消息时指定目标类型?

【问题讨论】:

  • 按目标类型是指要发送的任何特定 POJO 对象吗?
  • 是的。在我的示例中,类型为 F1.. 目标是在从 kafka 到对象的转换期间,即使侦听器被声明为 Foo,它也会知道它是 F1

标签: java apache-kafka spring-kafka


【解决方案1】:

使用自定义DefaultJackson2JavaTypeMapper 配置StringJsonMessageConverter,其中包含setTypePrecedence(TypePrecedence.TYPE_ID)

还设置类型映射以根据发送类型 id 标头映射到所需的类(仅当生产者的类与消费者的类不同时才需要)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-06-26
    • 1970-01-01
    • 1970-01-01
    • 2012-01-19
    • 1970-01-01
    • 2012-01-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多