【问题标题】:is it possible to create a queue listener using web flux spring integration?是否可以使用 Web Flux Spring 集成创建队列侦听器?
【发布时间】:2021-05-13 20:38:16
【问题描述】:
    @Component
    @RequiredArgsConstructor
    public class EventListener {
    
        private final EventProcessingService eventProcessingService;

        @JmsListener(destination = "inputQueue", constainerFactory = "myContainerFactory)
        public void receiveMessage(Message message) {
             eventProcessingService.doSome(message).subscribe(); // return Mono<Void>
        }
    }
    


     @Service
     public class EventProcessingService {

         public Mono<Void> doSome(Message message) {
            //...
         }
     }
  
    @Configuration
    @RequiredArgsConstructor
    public class MqIntegration {
        
        private final ConnectionFactory connectionFactory;

        @Bean
        public Publisher<Message<String>> mqReactiveFlow() {
            return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
                        .destination("testQueue"))
                .channel(MessageChannels.queue())
                .toReactivePublisher();
        }
    }

我有一些与 ibm mq 交互的 webflux 应用程序和一个 JmsListener,它在收到消息时侦听来自队列的消息 EventProcessingService 根据消息向其他服务发出请求。 我想知道如何使用 Spring Integration 创建一个与反应线程一起使用的 JmsListener。换句话说,我想知道是否可以创建一个集成流,该流将从队列中接收消息并在收到消息时调用 EvenProcessingService,这样它就不会对 webflux 应用程序内的线程产生负面影响

【问题讨论】:

    标签: spring-boot spring-integration ibm-mq spring-webflux spring-jms


    【解决方案1】:

    我认为您将不得不绕过@JmsListener,因为它正在注册一个 on 消息,尽管异步不会是被动的。 JMS 本质上是阻塞的,因此在顶部修补一个响应层将只是一个补丁。

    您将需要使用您创建的Publisher 来生成背压。我认为您将不得不定义和实例化您自己的侦听器 bean,它在以下行中执行某些操作:

        public Flux<String> mqReactiveListener() {
            return Flux.from(mqReactiveFlow())
                    .map(Message::getPayload);
        }
    

    【讨论】:

      【解决方案2】:

      我认为我们需要清理您问题中的一些要点。

      1. WebFlux 本身并不是一个项目。它是基于响应式服务器的关于 Web 的 Spring Framework 模块:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#spring-webflux
      2. @JmsListener 是另一个 Spring Framework 模块的一部分 - spring-jms。并且与响应式服务器用于 WebFlux 层的线程没有任何关系。 https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms
      3. Spring Integration 是一个独立的项目,它在 Spring Framework 依赖注入容器之上实现 EIP。在 Spring Framework 的 WebFlux API 之上,它确实有自己的用于通道适配器的 WebFlux 模块:https://docs.spring.io/spring-integration/docs/current/reference/html/webflux.html#webflux。它还在 Spring Framework 的 JMS 模块之上有一个 JMS 模块:https://docs.spring.io/spring-integration/docs/current/reference/html/jms.html#jms。但是,与 @JmsLisntener 没有任何关系,因为它的 Jms.messageDrivenChannelAdapter() 完全涵盖了该功能,并且从很大的角度来看,它也是以同样的方式实现的——通过 MessageListenerContainer

      所有这些都可能与问题无关,但最好清楚地了解您所问的内容,这样我们就会觉得我们与您在同一页上。

      现在试图回答您的问题。

      只要不从 WebFlux 层(@RequestMappingWebFlux.inboundGateway())处理 JMS,就不会影响那些非阻塞线程。 JMS MessageListenerContainer 产生自己的线程并执行从队列中拉取和消息处理。

      您对 JMS 配置和服务的解释看起来更像这样:

       @Bean
       public IntegrationFlow mqReactiveFlow() {
             return IntegrationFlows
                   .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
                            .destination("testQueue"))
                   .handle(this.eventProcessingService)
                   .nullChannel();
       }
      

      确实没有理由在 JMS 之后将消息转移到 QueueChannel,因为 JMS 侦听已经是一个异步操作。

      我们需要 nullChannel 在您的流程结束只是因为您的服务方法返回 Mono 并且框架不知道如何处理它。从版本5.4.3 开始, NullChannel 能够订阅生成给它的消息的Publisher 有效负载。

      您可以在两者之间使用FluxMessageChannel 来真正模拟 JMS 侦听器的背压,但这不会对您的下一个服务产生太大影响。

      【讨论】:

      • 非常感谢,这正是我想要的
      猜你喜欢
      • 1970-01-01
      • 2012-06-21
      • 1970-01-01
      • 2019-01-16
      • 2016-07-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多