【发布时间】:2019-05-22 05:31:11
【问题描述】:
我正在尝试为我的 Spring Integration Kafka 消息处理程序设置一个自定义消息转换器(是的,我知道我可以提供序列化程序配置 - 我正在尝试做一些不同的事情。
我有以下几点:
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
final KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setMessageConverter(new MessagingMessageConverter() {
@Override
public ProducerRecord<?, ?> fromMessage(final Message<?> message, final String s) {
LOGGER.info("fromMessage({}, {})", message, s);
return super.fromMessage(message, s);
}
});
return kafkaTemplate;
}
@Bean
@ServiceActivator(inputChannel = "kafkaMessageChannel")
public MessageHandler kafkaMessageHandler() {
final KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression(getTopic()));
handler.setSendSuccessChannel(kafkaSuccessChannel());
return handler;
}
当消息发送到kafkaMessageChannel 时,处理程序发送它并且结果显示在kafkaSuccessChannel 中,但是我在模板中设置的RecordMessageConverter 从未被调用
【问题讨论】:
标签: spring-integration spring-kafka