【问题标题】:Spring integration inbound-gateway Fire an event when queue is emptySpring集成入站网关当队列为空时触发事件
【发布时间】:2017-05-09 09:57:51
【问题描述】:

我是新手,但我会尽量保持谨慎。

{INPUT QUEUE}->[INBOUND-GATEWAY-1]-->[ROUTER]----------->(激活器){HOLD QUEUE}--->[INBOUND-GATEWAY-2]--^

我遇到了一种情况,我必须像前者一样动态更改流程中的路由条件。来自队列的消息被发送到一个激活器进行处理,或者另一个队列被搁置。在某些时候,我必须关闭 INBOUND-GATEWAY-1 以便没有新消息进入流程,并打开 INBOUND-GATEWAY-2 以处理来自 HOLD QUEUE 的所有消息。一旦来自 HOLD QUEUE 的所有消息都被消费完,两个网关都必须像以前一样关闭/打开。这里的问题是,我如何知道 HOLD QUEUE 何时为空,以便触发可以启动 gateway-1 的方法?

如果有人可以帮助我,我将不胜感激。

提前致谢

【问题讨论】:

    标签: routing jms spring-integration


    【解决方案1】:

    经过一番调试和阅读,我终于找到了解决这个问题的办法。入站网关是一个 JmsMessageDrivenEndpoint,它基于两个内部组件,一个 MessageListenerContainer 和一个 MessageListener。 MessageListenerContainer 是负责调度 MessageListener 行为的人,因此,覆盖 noMessageReceived 和 messageReceived,并添加一些属性来控制所需的行为,我可以做到这一点。

    我的 MessageListenerContainer 实现是这样的。

    public class ControlMessageListenerContainer extends DefaultMessageListenerContainer{
    
        private JmsMessageDrivenEndpoint mainInputGateway;
    
        private long timeOut;
    
        private long lastTimeReceived;  
    
        public PassControlMessageListenerContainer() {
            this.setAutoStartup(false);
        }
    
        @Override
        public void start() throws JmsException {
            /*When the container is started the lastTimeReceived is set to actial time*/
            lastTimeReceived = (new Date()).getTime();
            super.start();
        }
    
        @Override
        protected void noMessageReceived(Object invoker, Session session) {
            long actualTime = (new Date()).getTime();
    
            if((actualTime - lastTimeReceived) >= timeOut 
                    && mainInputGateway != null && !mainInputGateway.isRunning()){
                mainInputGateway.start();
            }       
            super.noMessageReceived(invoker, session);
        }
    
        @Override
        protected void messageReceived(Object invoker, Session session) {
            /*lastTimeReceived is set again to actual time at new message arrive*/
            lastTimeReceived = (new Date()).getTime();
            super.messageReceived(invoker, session);
        }
    }
    

    最后,spring bean 配置如下:

    <bean id="listenerContainer" 
        class="org.merol.ControlMessageListenerContainer">
        <property name="mainInputGateway" ref="mainGateway" />
        <property name="destination" ref="onHoldQueue" />
        <property name="timeOut" value="10000"/>
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    
    <bean id="messageListener" 
        class="org.springframework.integration.jms.ChannelPublishingJmsMessageListener">
        <property name="requestChannel" ref="outputChannel" />
    </bean>
    
    <bean id="inboundGateway" 
        class="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
        <constructor-arg name="listenerContainer" ref="listenerContainer" />
        <constructor-arg name="listener" ref="messageListener" />
    </bean>
    

    希望这对其他人有帮助。

    感谢@Nicholas 提供线索。

    【讨论】:

      【解决方案2】:

      我会将此功能放入入站网关处理器中。例如:

      Gateway1Processor:

      • start():从主服务器启动消费者 排队和处理。
      • stop():停止消费者。

      Gateway2Processor:

      • start():启动消费者离开 HOLD 队列。指定适当的超时。当超时触发时,(HOLD 队列为空)调用 stop()
      • stop():启动 Gateway1Processor 并停止此消费者。

      因此,操作顺序为:

      1. 启动 Gateway1Processor
      2. 某个时间,调用Gateway1Processor.stop()Gateway2Processor.start()
      3. Gateway2Processor 将耗尽 HOLD 队列,重新启动 Gateway1Processor 然后停止。
      4. 转到 #2。

      【讨论】:

      • 如果我理解你的要求正确,我认为这可以使用 org.springframework.jms.listener.DefaultMessageListenerContainer 或 org.springframework.jms.listener.SimpleMessageListenerContainer 来实现。
      • 非常感谢您的快速回复,@Nicholas。我只是对此有疑问:&lt;jms:inbound-gateway&gt; 标签使用的类是什么?我的意思是,我想我必须自己实现和/或覆盖入站网关的方法,所以我可以用类似 &lt;bean class="myImplementedGateway"&gt; &lt;property name="requestChannel" ref=""/&gt; ... &lt;/bean&gt; 的东西替换这个 &lt;jms:inbound-gateway id="cancel.book.dispatcher" request-channel="receiverBookChannel" request-destination="cancel.wait.book.engine" auto-startup="false"/&gt;
      猜你喜欢
      • 2014-10-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-06-08
      • 1970-01-01
      • 2014-11-08
      相关资源
      最近更新 更多