【问题标题】:spring-integration-kafka: KafkaTemplate#setMessageConverter(RecordMessageConverter) has no effectspring-integration-kafka:KafkaTemplate#setMessageConverter(RecordMessageConverter) 没有效果
【发布时间】:2019-05-22 05:31:11
【问题描述】:

我正在尝试为我的 Spring Integration Kafka 消息处理程序设置一个自定义消息转换器(是的,我知道我可以提供序列化程序配置 - 我正在尝试做一些不同的事情。

我有以下几点:

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    final KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setMessageConverter(new MessagingMessageConverter() {
        @Override
        public ProducerRecord<?, ?> fromMessage(final Message<?> message, final String s) {
            LOGGER.info("fromMessage({}, {})", message, s);
            return super.fromMessage(message, s);
        }
    });
    return kafkaTemplate;
}

@Bean
@ServiceActivator(inputChannel = "kafkaMessageChannel")
public MessageHandler kafkaMessageHandler() {
    final KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression(getTopic()));
    handler.setSendSuccessChannel(kafkaSuccessChannel());
    return handler;
}

当消息发送到kafkaMessageChannel 时,处理程序发送它并且结果显示在kafkaSuccessChannel 中,但是我在模板中设置的RecordMessageConverter 从未被调用

【问题讨论】:

    标签: spring-integration spring-kafka


    【解决方案1】:

    模板消息转换器仅在使用出站通道适配器不使用的template.send(Message&lt;?&gt;) 时使用。

    出站适配器使用其标头映射器映射标头本身;没有对消息负载执行转换。

    哪些文档让您相信转换器是在这种情况下使用的?

    【讨论】:

    • 我认为我们需要在请求消息的payload 中添加对ProducerRecord 的支持。这样,在向KafkaProducerMessageHandler 发送消息之前,这种转换可能会提前发生。
    • 我觉得没那么简单;生产者记录的某些组件是从 MessageHeaders 构建的,例如键,分区。我认为如果有人想建立自己的ProducerRecord,他们应该使用@ServiceActivator 中的KafkaTemplate,而不是使用出站适配器。
    猜你喜欢
    • 2020-10-30
    • 1970-01-01
    • 2018-12-25
    • 1970-01-01
    • 1970-01-01
    • 2017-05-05
    • 2021-01-31
    • 2014-11-04
    • 1970-01-01
    相关资源
    最近更新 更多