【问题标题】:Delayed Inbound Adapter and Control Bus延迟入站适配器和控制总线
【发布时间】:2019-12-09 14:48:03
【问题描述】:

我的集成流程代码是:

@Bean
    public IntegrationFlow messageFlow() {
        return IntegrationFlows.from(stompInboundChannelAdapter())
                .transform(inBoundStompMsgTransformer::transform)
                .headerFilter("stomp_subscription","content-length")
                .handle(Amqp.outboundAdapter(outboundConfiguration.rabbitTemplate()))
                .get();
    }

我正在使用 Spring Boot。

日志清除表明{transformer}订阅者已添加到输入频道

2019-12-09 18:21:41.752  INFO 18248 --- [           main] o.s.i.s.i.StompInboundChannelAdapter     : started bean 'stompInboundChannelAdapter'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@21e360a'
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {transformer} as a subscriber to the 'stompInputChannel' channel
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.stompInputChannel' has 1 subscriber(s).
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {header-filter} as a subscriber to the 'inboundFlow.channel#0' channel
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.inboundFlow.channel#0' has 1 subscriber(s).
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {amqp:outbound-channel-adapter} as a subscriber to the 'inboundFlow.channel#1' channel
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.inboundFlow.channel#1' has 1 subscriber(s).
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'

但是,我遇到了一个异常,并且丢失了队列中的前一/两条消息。它处理剩余的消息。

假设在我启动应用程序之前队列中有 10 条消息。在我启动应用程序后,即使日志显示已添加订阅者并且已启动 bean,我也会收到异常,发布异常,处理 8/9 消息。

例外是:org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.stompInputChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage

很明显,上下文还没有完全准备好处理消息,因此出现了异常。 但日志消息具有误导性。

我的第一个问题:

  1. 那么添加订阅者并启动 bean 究竟意味着什么?这是否意味着一切都已设置好,但上下文仍必须准备好处理消息?

为了克服这个问题,正如许多帖子中所建议的那样,我使用控制总线来启动适配器。代码是:

......
@Component
public class ApplicationLifeCycle implements SmartLifecycle {

    @Autowired
    private MessageChannel controlBusChannel;

    @Override
    public void start() {
        System.out.println("Service starting...");
        controlBusChannel.send(new GenericMessage<>("@stompInboundChannelAdapter.start()"));
    }
.....

我创建了public class ApplicationLifeCycle implements SmartLifecycle 认为它会很方便。

我的第二个问题是:

  1. 这是使用控制总线处理适配器启动/停止的正确/最佳方式吗?如果不是正确的方法,请告诉我正确的方法。

谢谢,

马赫什

【问题讨论】:

    标签: spring-integration spring-integration-dsl


    【解决方案1】:

    我认为这是您其他问题的延续:IntegrationFlow Amqp Channel Adapter is not working in handle()

    你有这个:

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(stompSessionManager(), "/queue/myQueue");
        adapter.setOutputChannel(stompInputChannel());
        adapter.setPayloadType(ByteString.class);
        return adapter;
    }
    

    你没有在这里显示。

    问题是你在IntegrationFlow 中使用了相同的定义。原来StompInboundChannelAdapter bean 更早启动,然后IntegationFlow 被处理,.transform(inBoundStompMsgTransformer::transform) 被订阅处理传入消息。

    因此,如果您将 @BeanstompInboundChannelAdapter() 中删除,它应该可以正常工作。后面我看看为什么MessageProducerSupport启动得更早,然后IntegrationFlows...

    【讨论】:

    • 是的,您的假设是正确的。它是我另一个问题的延续。我将尝试删除 Stomp Inbound Adapter 的 @Bean。为了我的知识增长,你对我的第二个问题有什么建议/答案?请告诉我。
    • 好吧,当然,您可以在需要时注入该通道适配器,并从您的逻辑中手动调用它的stop()/start()。当stop()/start() 是消息处理的一部分时,Control Bus 用于流逻辑。所以,如果你的逻辑是用 Spring Integration 和消息实现的,最好将 stopstart 命令发送到控制总线的某个通道,但是如果你的逻辑不在 Spring Integration 中,没有理由只引入消息传递用于发送stop/start 命令。
    • 取出@Bean 作品。将 Autostartup 设置为 false 并使用控制总线启动它也可以。我将使用控制总线选项。谢谢
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-11-03
    • 1970-01-01
    • 2011-03-14
    • 2012-09-03
    • 1970-01-01
    • 2015-03-04
    • 1970-01-01
    相关资源
    最近更新 更多