【问题标题】:Spring Integration - 2 MessageHandlers handle half of messagesSpring Integration - 2 个 MessageHandler 处理一半的消息
【发布时间】:2021-04-21 14:56:47
【问题描述】:

我正在使用 Spring 集成框架,带有一个 Transformer

inputChannel -> kafka 消费者
outputChannel -> 数据库 jdbc 编写器

@Bean
public DirectChannel inboundChannel() {
    return new DirectChannel();
}

   @Bean
    public DirectChannel outboundChannel() {
        return new DirectChannel();
    }

@Bean
@Transformer(inputChannel="inboundChannel", outputChannel="outboundChannel")
public JsonToObjectTransformer jsonToObjectTransformer() {
    return new JsonToObjectTransformer(Item.class);
}




@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler jdbcmessageHandler() {
    JdbcMessageHandler jdbcMessageHandler = new ...

    return ...;
}

@Bean
@ServiceActivator(inputChannel = "inboundChannel")
public MessageHandler kafkahandler() {
    return new ...;
}

在我覆盖的两个处理程序中

public void handleMessage(Message<?> message)

问题:如果在 kafka 中总共有 N 条消息, 然后每个 handleMessage() 都被调用 n/2 次!

我假设每个处理程序将被调用 n 次,因为每个处理程序链接到不同的通道,并且总共有 n 条消息。

我错过了什么?

(如果我禁用 kafak 处理程序,第二个处理程序会获取所有 n 条消息)

更新:
我需要订阅者从同一频道获取所有消息(kafka 处理程序将对原始数据进行处理,jdbc 处理程序将推送转换后的 数据)

【问题讨论】:

    标签: spring-integration spring-kafka spring-jdbc


    【解决方案1】:

    首先,您的inboundChanneloutboundChannel 已不再使用:您无处(至少在问题中)指定它们的名称。

    inputoutput 之类的名称被框架采用并用于创建新的 MessageChannel bean,这些 bean 在其他地方使用。

    现在看看你有什么:

    @Transformer(inputChannel="input"
    @ServiceActivator(inputChannel = "input")
    

    他们都是同一个input 频道的订阅者,因为它是由框架自动创建的DirectChannel。此通道基于循环 LoadBalancingStrategy,因此您会在 Kafka 中看到 n/2,因为它的服务激活器仅处理发送到该 input 通道的每一秒消息。

    请在文档中查看更多信息:https://docs.spring.io/spring-integration/reference/html/core.html#channel-configuration-directchannel

    【讨论】:

    • 好的,首先抱歉混淆了,名称有点不同,我为简单起见更改了它们,没有意识到它们被系统使用。应该是 outboundChannel inboundChannel。我解决了这个问题。现在,我可以从 2 个来源订阅同一个频道并“发布”消息吗?
    • 当然可以,但它仍然是DirectChannel,具有默认的循环平衡策略。如果您想将相同的消息分发给多个订阅者,请考虑改用PiblishSubscribeChannel
    猜你喜欢
    • 2014-10-23
    • 1970-01-01
    • 1970-01-01
    • 2015-02-15
    • 2018-02-01
    • 1970-01-01
    • 1970-01-01
    • 2019-03-03
    • 2023-03-29
    相关资源
    最近更新 更多