【问题标题】:How to temporarily disable a message listener如何临时禁用消息侦听器
【发布时间】:2019-09-11 22:26:14
【问题描述】:

暂时禁用消息侦听器的好方法是什么?我要解决的问题是:

  • 消息侦听器接收到 JMS 消息
  • 我在尝试处理邮件时遇到错误。
  • 我等待我的系统再次准备好处理消息。
  • 在我的系统准备好之前,我不想再收到任何消息,所以...
  • ...我想禁用消息监听器。
  • 我的系统已准备好再次处理。
  • 处理失败的消息,并确认 JMS 消息。
  • 再次启用消息监听器。

现在,我正在使用 Sun App Server。我通过在 MessageConsumer 中将其设置为 null 来禁用消息侦听器,并使用 setMessageListener(myOldMessageListener) 再次启用它,但此后我不再收到任何消息。

【问题讨论】:

    标签: java jms


    【解决方案1】:

    如果您在系统准备好再次处理消息之前不从onMessage() 侦听器方法返回怎么办?这将阻止 JMS 向该消费者传递另一条消息。

    这相当于在同步情况下不调用receive()

    给定的 JMS 会话没有多线程,因此消息管道一直保持到 onMessage() 方法返回。

    我不熟悉动态调用setMessageListener() 的含义。 javadoc 说there's undefined behavior 如果在“现有侦听器或同步消费者正在使用消息时”调用。如果您是从 onMessage() 内部调用,听起来您遇到了未定义的情况。

    在连接级别有 start/stop 方法,如果这对您来说不是太粗粒度的话。

    【讨论】:

    • 哦,原来就是这么简单。我预计会有更多线程调用 onMessage 方法。非常感谢!
    • 连接是线程安全的。关闭的会话(会话、消费者、生产者)不是线程安全的。您的代码需要避免多线程访问。为了强制执行,JMS 不能对一个消费者进行多线程侦听器调用。
    • 我刚刚注意到 JMS 规范明确指定了这种串行传递行为。这是 JMS 1.0.2 规范的第 4.4.16 节。过去我认为这只是线程规则所暗示的。
    • 我们可以使用 Thread.sleep() 直到问题得到解决吗?这是最好的设计吗?
    【解决方案2】:

    通过用 receive() 循环替换消息侦听器的解决方法解决了问题,但我仍然对如何禁用消息侦听器并很快再次启用它感兴趣。

    【讨论】:

    • 您好,您是否找到了暂停消息侦听器接收过程的解决方案
    【解决方案3】:

    在我看来,消息正在传递,但它们没有发生任何事情,因为您没有附加侦听器。自从我用 JMS 做任何事情以来已经有一段时间了,但是您不希望在修复系统时将消息发送到死信队列或其他东西,然后在您完成后将消息移回原始队列准备好再次处理了吗?

    【讨论】:

    • 可能就是这样。不幸的是,我没有对 jms 服务器的控制,我所能做的就是指定一个从中获取消息的队列,没有死信队列。我想我可以停止 QueueConnection,但这不能从消息侦听器线程中完成。
    • 你怎么没有访问JMS服务器的权限。正如 duffymo 正确指出的那样,它是您想要访问的错误队列。否则,您最终将在 JMS 服务器为您提供的代码中复制行为
    • 我确实可以访问 JMS 服务器...在我的开发环境中,但我不能期望我正在处理的应用程序的用户(两个消息传递系统之间的桥梁)拥有JMS 服务器的相同权限。死信队列是处理这些问题的常用方法吗?
    【解决方案4】:

    在 WebLogic 上,您可以设置最大重试次数、用于处理超过最大重试次数限制的消息的错误队列以及其他参数。我不确定我的头,但你也可以指定一个等待期。所有这些都可以在管理控制台中使用。我会查看您拥有的 JMS 提供程序的管理员,看看它是否可以做类似的事情。

    【讨论】:

      【解决方案5】:

      在 JBoss 中,以下代码可以解决问题:

         MBeanServer mbeanServer = MBeanServerLocator.locateJBoss();
          ObjectName objName = new ObjectName("jboss.j2ee:ear=MessageGateway.ear,jar=MessageGateway-EJB.jar,name=MessageSenderMDB,service=EJB3");
          JMSContainerInvokerMBean invoker = (JMSContainerInvokerMBean) MBeanProxy.get(JMSContainerInvokerMBean.class, objName, mbeanServer);
      
          invoker.stop(); //Stop MDB
          invoker.start(); //Start MDB
      

      【讨论】:

        【解决方案6】:

        我觉得你可以打电话

        messageConsumer.setMessageListener(null);
        

        在您的 MessageListener 实现中并安排重建任务(例如在 ScheduledExecutorService 中)。这个任务应该调用

        connection.stop();
        messageConsumer.setMessageListener(YOUR_NEW_LISTENER);
        connection.start();
        

        它会工作的。 start() 和 stop() 方法用于重新启动传递结构(不是 TCP 连接)。

        阅读 Javadoc https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html#stop--

        暂时停止连接传递传入消息。可以使用连接的启动方法重新启动交付。当连接停止时,将禁止向所有连接的消息消费者传递:同步接收阻塞,并且消息不会传递给消息侦听器。

        【讨论】:

          【解决方案7】:

          要暂时停止连接传递传入消息,您需要使用来自Connection 接口的stop() 方法:https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html#stop--

          只是不要从MessageListener 调用connection.stop(),因为根据JMS 规范。你会遇到死锁或异常。相反,您可以从不同的线程调用connection.stop(),您只需要同步MessageListener 和要挂起与函数connection.stop() 的连接的线程

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2010-12-04
            • 2011-01-20
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多