【发布时间】:2021-04-21 14:56:47
【问题描述】:
我正在使用 Spring 集成框架,带有一个 Transformer
inputChannel -> kafka 消费者
outputChannel -> 数据库 jdbc 编写器
@Bean
public DirectChannel inboundChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@Transformer(inputChannel="inboundChannel", outputChannel="outboundChannel")
public JsonToObjectTransformer jsonToObjectTransformer() {
return new JsonToObjectTransformer(Item.class);
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler jdbcmessageHandler() {
JdbcMessageHandler jdbcMessageHandler = new ...
return ...;
}
@Bean
@ServiceActivator(inputChannel = "inboundChannel")
public MessageHandler kafkahandler() {
return new ...;
}
在我覆盖的两个处理程序中
public void handleMessage(Message<?> message)
问题:如果在 kafka 中总共有 N 条消息, 然后每个 handleMessage() 都被调用 n/2 次!
我假设每个处理程序将被调用 n 次,因为每个处理程序链接到不同的通道,并且总共有 n 条消息。
我错过了什么?
(如果我禁用 kafak 处理程序,第二个处理程序会获取所有 n 条消息)
更新:
我需要订阅者从同一频道获取所有消息(kafka 处理程序将对原始数据进行处理,jdbc 处理程序将推送转换后的
数据)
【问题讨论】:
标签: spring-integration spring-kafka spring-jdbc