【问题标题】:Better way to wait to receive Async messages in ActiveMQ在 ActiveMQ 中等待接收异步消息的更好方法
【发布时间】:2022-02-14 21:10:36
【问题描述】:

我使用 ActiveMQ 异步发送和接收消息。

在那里,我在决定等待消息的最佳方式时遇到了问题。 在循环中休眠线程是一种选择。但是感觉不太适合我。

谁能为此提出更好的方法。

AsyncReceiver.java

public class AsyncReceiver implements MessageListener, ExceptionListener{

    public static void main(String[] args) throws Exception{

        Properties env = new Properties();                                  
        env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
        env.put("queue.queueSampleQueue","MyNewQueue");

        InitialContext ctx = new InitialContext(env);
        Queue queue = (Queue) ctx.lookup("queueSampleQueue");
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
        QueueConnection queueConn = connFactory.createQueueConnection();
        QueueSession queueSession = queueConn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);

        QueueReceiver queueReceiver = queueSession.createReceiver(queue);
        AsyncReceiver asyncReceiver = new AsyncReceiver();
        queueReceiver.setMessageListener(asyncReceiver);
        queueConn.setExceptionListener(asyncReceiver);
        queueConn.start();

        // Waiting for messages
        System.out.print("waiting for messages");
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
        }

        queueConn.close();
    }

    public void onMessage(Message message){
        TextMessage msg = (TextMessage) message;
        try {
            System.out.println("received: " + msg.getText());
        } catch (JMSException ex) {
            ex.printStackTrace();
        }
    }

    public void onException(JMSException exception){
        System.err.println("an error occurred: " + exception);
    }
}

Sender.java

public class Sender{

    public static void main(String[] args) throws Exception{

        Properties env = new Properties();
        env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
        env.put("queue.queueSampleQueue", "MyNewQueue");

        InitialContext ctx = new InitialContext(env);
        Queue queue = (Queue) ctx.lookup("queueSampleQueue");
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
        QueueConnection queueConn = connFactory.createQueueConnection();
        QueueSession queueSession = queueConn.createQueueSession(false,Session.DUPS_OK_ACKNOWLEDGE);

        QueueSender queueSender = queueSession.createSender(queue);
        queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        TextMessage message = queueSession.createTextMessage("Hello");
        queueSender.send(message);
        System.out.println("sent: " + message.getText());

        queueConn.close();
    }
}

【问题讨论】:

    标签: java asynchronous jms activemq


    【解决方案1】:

    有两种方法可以处理/使用队列中的消息。

    1. 定期检查队列是否有新消息 - 如果您定期运行程序,这很合适。您可以通过实现带有一些线程休眠的循环来做到这一点。前任。一天两次,一天一次等等。

    2. 向队列注册消费者(使用 MessageListener)。您可以按照以下示例执行此操作。

    Consumer.java

            javax.jms.Connection connection = null;
            Session session = null;
            Destination destination = null;
            MessageConsumer consumer = null;
    
    
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
            connection = connectionFactory.createConnection();
            connection.start();
    
    
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(queueName);
    
    
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new YourClass());
    

    YourClass.java

    public class YourClass implements MessageListener {
    @Override 
    public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    inputJsonString = textMessage.getText();
                   //do what ever you want with inputJsonString
                    message.acknowledge(); 
               }
    

    }

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-12-23
      • 1970-01-01
      • 1970-01-01
      • 2015-11-01
      • 1970-01-01
      • 2013-04-12
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多