【问题标题】:JEE7 + WildFly (HornetQ) - Pause queue from applicationJEE7 + WildFly (HornetQ) - 应用程序暂停队列
【发布时间】:2015-07-08 15:51:29
【问题描述】:

我们使用 WildFly + HornetQ 作为我们的应用服务器和 JMS 消息队列,并且要求能够从应用程序中暂停/恢复队列。这可能吗?

【问题讨论】:

    标签: java jms wildfly-8 java-ee-7 hornetq


    【解决方案1】:

    您是否正在寻找停止和开始传递消息的方法?如果是这样,那么 JMS 定义 connection.Stop 方法来暂停消息的传递。可以使用connection.Start 方法恢复消息传递。

    所以 HornetQ JMS 客户端将实现这些方法。您将需要使用这些方法。

    【讨论】:

      【解决方案2】:

      这可以使用 JMX 或使用 hornetq 核心管理 api 来完成。

      在本示例中,使用 wildfly 8.1.0.Final 运行 Standalone-full-ha 配置文件。

      所需的 Maven 依赖项:

          <dependency>
              <groupId>org.hornetq</groupId>
              <artifactId>hornetq-jms-client</artifactId>
              <version>2.4.1.Final</version>
          </dependency>
      
          <dependency>
              <groupId>org.wildfly</groupId>
              <artifactId>wildfly-jmx</artifactId>
              <version>8.1.0.Final</version>
          </dependency>
      

      这是一个测试类,演示了通过 JMX 使用 JmsQueueControl:

      package test.jmx.hornetq;
      
      import org.hornetq.api.jms.management.JMSQueueControl;
      
      import javax.management.*;
      import javax.management.remote.JMXConnector;
      import javax.management.remote.JMXConnectorFactory;
      import javax.management.remote.JMXServiceURL;
      
      public class WildflyJmsControl {
      
          public static void main(String[] args) {
              try {
                  //Get a connection to the WildFly 8 MBean server on localhost
                  String host = "localhost";
                  int port = 9990;  // management-web port
                  String urlString = System.getProperty("jmx.service.url","service:jmx:http-remoting-jmx://" + host + ":" + port);
                  JMXServiceURL serviceURL = new JMXServiceURL(urlString);
                  JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceURL, null);
                  MBeanServerConnection connection = jmxConnector.getMBeanServerConnection();
      
                  String queueName = "testQueue"; // use your queue name here
      
                  String mbeanObjectName = "jboss.as:subsystem=messaging,hornetq-server=default,jms-queue=" + queueName;
                  ObjectName objectName = ObjectName.getInstance(mbeanObjectName);
      
                  JMSQueueControl jmsQueueControl = (JMSQueueControl) MBeanServerInvocationHandler.newProxyInstance(connection, objectName, JMSQueueControl.class, false);
                  assert jmsQueueControl != null;
      
                  long msgCount = jmsQueueControl.countMessages(null);
      
                  System.out.println(mbeanObjectName + " message count: " + msgCount);
      
                  jmsQueueControl.pause();
                  System.out.println("queue paused");
      
                  jmsQueueControl.resume();
                  System.out.println("queue resumed");
      
                  jmxConnector.close();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      

      要通过 JMS 访问 hornetq 管理,请使用:

      package test.jms.hornetq;
      
      import org.hornetq.api.core.TransportConfiguration;
      import org.hornetq.api.core.client.*;
      import org.hornetq.api.core.management.ManagementHelper;
      import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
      
      public class HornetqService {
      
          public void testPauseResumeQueue() {
              // this class needs to run in the same jvm as the wildfly server (i.e. not a remote jvm)
              try {
                  ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
                          InVMConnectorFactory.class.getName()));
      
                  ClientSession session = locator.createSessionFactory().createSession();
      
                  session.start();
      
                  ClientRequestor requester = new ClientRequestor(session, "jms.queue.hornetq.management");
      
                  String queueName = "testQueue"; // use your queue name here
      
                  // get queue message count
                  ClientMessage message = session.createMessage(false);
                  ManagementHelper.putAttribute(message, queueName, "messageCount");
      
                  ClientMessage reply = requester.request(message);
                  int count = (Integer) ManagementHelper.getResult(reply);
                  System.out.println("There are " + count + " messages in exampleQueue");
      
                  // pause the queue
                  message = session.createMessage(false);
                  ManagementHelper.putOperationInvocation(message, queueName, "pause");
      
                  requester.request(message);
      
                  // get queue paused
                  message = session.createMessage(false);
                  ManagementHelper.putAttribute(message, queueName, "paused");
                  reply = requester.request(message);
                  Object result =  ManagementHelper.getResult(reply);
                  System.out.println("result: " + result.getClass().getName() + " : " + result.toString());
      
                  // resume queue
                  message = session.createMessage(false);
                  ManagementHelper.putOperationInvocation(message, queueName, "resume");
                  requester.request(message);
      
                  // get queue paused
                  message = session.createMessage(false);
                  ManagementHelper.putAttribute(message, queueName, "paused");
                  reply = requester.request(message);
                  Object result2 =  ManagementHelper.getResult(reply);
                  System.out.println("result2: " + result2.getClass().getName() + " : " + result2.toString());
      
                  requester.close();
      
                  session.close();
              }catch (Exception e){
                  System.out.println("Error pausing queue" + e.getMessage());
              }
          }
      }
      

      【讨论】:

      • 嗨,Euan,刚刚看到你的答案。我一直在尝试确定队列是否已暂停,为此我尝试使用 ManagementHelper api 和您建议的代码中的“暂停”操作。但是,我在运行它时得到以下信息:Queue paused: HQ119069: no operation paused/0。似乎“暂停”不是受支持的操作。你能帮我解决这个问题吗?我错过了什么吗?
      • 嗨 Gravetii。没有看到你的代码很难说。 ClientSession 建好后你启动了吗?我发现有人有类似的问题,但属性不同。查看:link 如果这不起作用,您能否打开一个新问题并将链接发送给我,我可以查看您的代码
      猜你喜欢
      • 1970-01-01
      • 2021-02-12
      • 1970-01-01
      • 2013-08-28
      • 1970-01-01
      • 1970-01-01
      • 2013-09-12
      • 1970-01-01
      • 2019-05-06
      相关资源
      最近更新 更多