【问题标题】:How to monitor multiple JMS queues如何监控多个 JMS 队列
【发布时间】:2015-09-18 17:17:57
【问题描述】:

我的应用程序需要监控多个 JMS 队列。

这应该怎么做? 启动2个线程? 可以同时监控2个队列吗?

一个队列的示例代码:

...
queue1 = session.createQueue("queue-1");
consumer = session.createConsumer(queue1);

connection.start();

while (true) {
    Message m = consumer.receive(10000);

    if (m == null) {
        ...nothing...
    } else {
        ...do something with the message...
    }
}
...

我应该如何观看 queue-1 和 queue-2?

【问题讨论】:

  • 你探索过 JMX API 吗?
  • 我对 API 和实现有基本的了解。一个队列上有多个客户端,但很难找到如何将一个客户端连接到多个队列上的信息。
  • 你不能写一个以队列名作为参数的队列接收器,然后在你的main(或等效方法)中,使用构造函数参数“queue-1”和“queue-2”创建两个实例分别。 ?
  • 我可以使用参数并创建多个实例。但必须监控两个队列。我应该启动不同的线程吗?

标签: java multithreading queue jms


【解决方案1】:

您可以为此使用石英调度程序Quartz Scheduler。像这样实现一个(或多个)石英作业:

public class MessageReaderJob1 implements Job {
private QueueReader1 qr;
@Override
public synchronized void execute(JobExecutionContext arg0) throws JobExecutionException {
    qr = QueueReader1.getInstance();

    try {
        Message message = qr.getConsumer().receiveNoWait();
        ....
    }
}

然后您将需要一个调度程序,您将从您的应用程序(main 方法或 servlet)运行,请注意,您也可以为第二个队列实现不同的触发器:

public class TestCasesSchedule {

private Scheduler scheduler;

public void createSchedule() {
    JobDetail job1 = JobBuilder.newJob(MessageReaderJob1.class)
            .withIdentity("jobname", Scheduler.DEFAULT_GROUP)
            .build();

    JobDetail job2 = JobBuilder.newJob(MessageReaderJob2.class)
            .withIdentity("jobname", Scheduler.DEFAULT_GROUP)
            .build();

    Trigger trigger = TriggerBuilder.newTrigger()
            .withIdentity("minutestrigger", "triggergroup")
            .startNow()
            .withSchedule(SimpleScheduleBuilder.simpleSchedule()
                    .withIntervalInMinutes(5)
                    .repeatForever())
            .build();

    try {
        SchedulerFactory sf = new StdSchedulerFactory();
        scheduler = sf.getScheduler();
        scheduler.start();
        scheduler.scheduleJob(job1, trigger);
        scheduler.scheduleJob(job2, trigger);
    } catch (SchedulerException se) {
        System.err.println(se.getMessage())
    }
}

您的队列之一的 QueueReader 如下所示:

public class QueueReader1 {

private MessageConsumer consumer = null;
private Context jndiContext = null;
private QueueConnectionFactory queueConnectionFactory = null;
private QueueConnection queueConnection = null;
private QueueSession queueSession = null;
private Queue queue = null;

private static final QueueReader instance = new QueueReader();

public synchronized static QueueReader getInstance() {
    return instance;
}

private QueueReader() {

    /*
     * Create a JNDI API InitialContext object if none exists
     * yet.
     */
     try {
        jndiContext = new InitialContext();


    } catch (NamingException e) {
        System.err.println(e.getMessage())
        System.exit(1);
    }

    /*
     * Look up connection factory and queue.  If either does
     * not exist, exit.
     */
    try {
        queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup("connection_factory_name");
        queue = (Queue) jndiContext.lookup("queue_name");
        queueConnection =
                queueConnectionFactory.createQueueConnection();
        queueSession =
                queueConnection.createQueueSession(false,
                Session.AUTO_ACKNOWLEDGE);

        consumer = queueSession.createConsumer(queue);

        queueConnection.start();

    } catch (JMSException ex) {
         System.err.println(ex.getMessage());
    } catch (NamingException e) {
         System.err.println(e.getMessage());
    }
}

}

【讨论】:

    【解决方案2】:

    这是我的解决方案。有用。欢迎任何额外的建议!

    主类:

    public class Notifier {
        public static void main(String[] args) throws Exception {
            // Start a thread for each JMQ queue to monitor.
            DestinationThread destination1 = new DestinationThread("queue1");
            DestinationThread destination2 = new DestinationThread("queue2");
            destination1.start();
            destination2.start();
        }
    }
    

    主题:

    public class DestinationThread extends Thread {
    
        private String destinationQueue;
    
        private static ActiveMQConnectionFactory connectionFactory = null;
        private static Connection connection = null;
        private static Session session = null;
        private static Destination destination = null;
        private static MessageConsumer consumer = null;
    
        public DestinationThread(String destinationQueue) {
            this.destinationQueue = destinationQueue;
        }
    
        @Override
        public void run() {
            try {
                    initializeThread(destinationQueue);
                    startThread(destinationQueue);
            } catch (Exception e) {
                //TODO
            }
        }
    
        private void initializeThread(String destinationQueue) {
            boolean connectionMade = false;
            while (!connectionMade) {
                try {
                    connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
                    connection = connectionFactory.createConnection();
                    connection.start();
                    session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
                    destination = session.createQueue(destinationQueue);
                    consumer = session.createConsumer(destination);
                    connectionMade = true;
                } catch (JMSException e) {
                    //TODO
                    try {
                        Thread.sleep(30000);
                    } catch (InterruptedException ie) {
                    }
                }
            }
        }
    
    
        private void startThreadOther(String destinationQueue) throws Exception {
            while (true) {
                try {
                    Message message = consumer.receive(300000);
                    if (message == null) {
                        //No message received for 5 minutes - Re-initializing the connection
                        initializeThread(destinationQueue);
                    } else if (message instanceof TextMessage) {
                        if (destinationQueue.equals("queue1") {
                            //Message received from queue1 - do something with it
                        } else if (destinationQueue.equals("queue2") {
                            //Message received from queue2 - do something with it
                        } else {
                            //nothing
                        }
                    } else {
                        //nothing
                    }
                } catch (Exception e) {
                    //TODO
                }
            }
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-09-18
      • 1970-01-01
      • 2020-04-04
      • 2013-02-27
      • 2016-10-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多