【问题标题】:How can I push messages from Activemq to Java Application如何将消息从 Activemq 推送到 Java 应用程序
【发布时间】:2013-08-10 06:54:22
【问题描述】:

开发人员您好,

我想写 2 个.java 文件,使用JMS 库名称是MessageProducerMessageConsumer

我在lib 文件夹中添加了activemq-all-5.8.0.jarcommons-io-2.4.jar 文件。我将Activemq 的端口号从61616 更改为61617

使用MessageProducer.java 文件,我将向Activemq 发送消息。为此,我编写了代码,它工作正常。如果您想查看,请单击此Link

我想从ActivemqMessageConsumer.java 发送消息。这是应用程序在Apache Tomcat(http://localhost:8080/ExecutableFileProcess/MessageConsumer)

一旦MessageConsumer 收到消息,它会将消息正文与消息分开,并仅在控制台上打印(仅用于我的测试)。为此,我编写了以下 2 个java 文件。但它不起作用。

MessageConsumer.java:

 package PackageName;
 import java.io.IOException;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.MessageListener;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.activemq.ActiveMQConnectionFactory;
 public class MessageConsumer extends HttpServlet {
@Override
protected void service(HttpServletRequest arg0, HttpServletResponse arg1)
        throws ServletException, IOException {
try {
    //creating connectionfactory object for way
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61617");
    //establishing the connection b/w this Application and Activemq
    Connection connection=connectionFactory.createConnection();
    Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue=session.createQueue("MessageTesing");
    javax.jms.MessageConsumer consumer=session.createConsumer(queue);
    //fetching queues from Activemq
    MessageListener listener = new MyListener();
    consumer.setMessageListener(listener);
    connection.start();
} catch (Exception e) {
    // TODO: handle exception
}

}
}

MyListener.java:

package PackageName;
import javax.jms.Message;
import javax.jms.MessageListener;
public class MyListener implements MessageListener {
public void onMessage(Message msg) {
    System.out.println(msg);
}
};

我没有在Activemq console 中为队列配置目的地,也没有在从MessageProducer.java 发送消息时提及“目的地”。

我正在使用Eclipse。如何在控制台中打印messagebody,实际上基于messagebody我会在我的MessageConsumer.java中进行一些操作。但是对于我的测试我需要查看messagebody。

我希望,你明白我在尝试什么。

我是JMSJava的新手,你能解释清楚吗?到目前为止我用谷歌搜索写了代码。但是我没有找到这个问题。

谁能推荐我。

谢谢。

【问题讨论】:

标签: java jms activemq java-ee-6


【解决方案1】:

您的程序可能刚刚启动和终止,因为您已经编写了客户端以异步接收消息(通过使用MessageListener,在一个不同的线程中),而您的主线程刚刚终止。

试试这样的:

                connection.start();

                System.out.println("Press a key to terminate");
                try {
                    System.in.read();
                } catch (IOException e) {
                    e.printStackTrace();
                }

                System.out.println("End");

这应该让你的程序一直运行,直到你按下一个键。


要阅读TextMessage 的正文,请使用类似这样的内容(引用自ActiveMQ hello world examples

            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                System.out.println("Received: " + text);
            } else {
                System.out.println("Received: " + message);
            }

【讨论】:

  • 如果我运行任何程序,我会收到 3 条警告消息。在此链接 http://stackoverflow.com/questions/18129668/how-can-i-remove-warning-messages-in-activemq 中说明了所有内容。我将您的代码附加到我的程序中,第一次运行正常。我第二次尝试从这里它不起作用。我认为是因为那个警告。我该如何解决这个问题。
  • 当程序在第一次运行期间收到消息时,这些消息被消耗掉(之后队列为空)。您必须发送一些新消息。警告消息与 log4j 相关,这应该不是问题。
  • 刚才我明白了,但是如何修复log4j文件中的警告消息
  • @user2642355 我会看看另一个问题。但是您在这个问题上需要更多帮助吗?
  • 是的,我想从消息中获取消息正文。我没有找到任何方法。我试过System.out.println(msg.getJMSMessageID());。但我得到了messageId。如果您知道是否有任何功能可以从消息中获取消息正文。
【解决方案2】:
import java.io.Serializable;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

public class ActiveMQConnection {

    private static final long timeout = 3000L;

    private Logger logger = Logger.getLogger(ActiveMQConnection.class);

    private String activeMQUser; 
    private String activeMQPassword;
    private String activeMQURI;
    private String activeMQQueueName;

    private ActiveMQConnectionFactory connectionFactory;
    private Connection connection;
    private Session session;
    private boolean isConnected;

    private boolean transacted = false;
    private Destination destinationQueue;


    private boolean isQueueAvailable;
    private boolean isProducerAvailable;
    private boolean isConsumerAvailable;

    private MessageProducer producerForQueue;
    private MessageConsumer consumerForQueue;

    /**
     * Create Default service
     * @throws Exception 
     */
    public ActiveMQConnection() throws Exception
    {
        try {
                throw new JMSException("No Parameters defined for creating connection. Try constructor with parameters.");
        } catch (JMSException e) {
            logger.error("JMS Exception in Active MQ Connection",e);
            throw e;
        } catch (Exception e) {
            logger.error("JMS Exception in Active MQ Connection",e);
            throw e;
        }
    }


    /**
     * Create a service with desired parameters.
     * @param activeMQUser
     * @param activeMQPassword
     * @param activeMQURI
     * @param activeMQQueueName
     * @throws Exception 
     */
    public ActiveMQConnection(String activeMQUser, String activeMQPassword, String activeMQURI) throws Exception
    {
        try {
            this.activeMQUser = activeMQUser;
            this.activeMQPassword = activeMQPassword;
            this.activeMQURI =  activeMQURI;
            setUpActiveMQConnection();

        } catch (JMSException e) {
            logger.error("JMS Exception in Active MQ Connection",e);
            throw e;
        } catch (Exception e) {
            logger.error("Exception in Active MQ Connection",e);
            throw e;
        }
    }

    /**
     * @throws JMSException, Exception 
     */
    private void setUpActiveMQConnection() throws JMSException, Exception
    {
        connectionFactory = new ActiveMQConnectionFactory(
                this.activeMQUser,
                this.activeMQPassword,
                this.activeMQURI );
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
            isConnected = true;
        }catch (JMSException e) {
            isConnected = false;
            throw e;
        }catch(Exception e){
            isConnected = false;
            throw e;
        }
    }

    /**
     * @throws Exception
     */
    public void setUpQueue(String queueName ) throws Exception
    {
        this.activeMQQueueName = queueName;
        createQueue();
        createProducer();
        createConsumer();
    }

    /**
     * @throws Exception
     */
    private void createQueue() throws Exception 
    {
        try {
            if(destinationQueue == null)
                {   
                destinationQueue = session.createQueue(this.activeMQQueueName);
                isQueueAvailable = true;
                }
        } catch (JMSException e) {
            isQueueAvailable = false;
            throw e;
        }catch(Exception e){
            isQueueAvailable = false;
            throw e;
        }
    }

    /**
     * @throws JMSException 
     * 
     */
    private void createProducer() throws JMSException
    {   
        if(producerForQueue == null)
        {   
            try {
                producerForQueue = session.createProducer(destinationQueue);
                producerForQueue.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                isProducerAvailable = true;
            } catch (JMSException e) {
                isProducerAvailable = false;
                throw e;
            }
        }
    }

    /**
     * @throws JMSException
     */
    private void createConsumer() throws JMSException
    {   
        if(consumerForQueue == null)
        {   
            try {
                consumerForQueue = session.createConsumer(destinationQueue);
                isConsumerAvailable = true;
            } catch (JMSException e) {
                isConsumerAvailable = false;
                throw e;
            }
        }
    }

    /**
     * @param objectToQueue
     * @throws JMSException
     */
    public void sendMessage(Serializable objectToQueue) throws JMSException 
    {
        ObjectMessage message = session.createObjectMessage();
        message.setObject(objectToQueue);
        producerForQueue.send(message);
    }

    /**
     * @param objectToQueue
     * @throws JMSException
     */
    public Serializable receiveMessage() throws JMSException 
    {
        Message message = consumerForQueue.receive(timeout);
        if (message instanceof ObjectMessage) 
              { 
                  ObjectMessage objMsg = (ObjectMessage) message;
                  Serializable sobject = objMsg.getObject();
                  return sobject;
              }
        return null;
    }

    /**
     * close-MQ-Connection
     */
    public void closeMQConnection()
    {
        try 
        {
            if(consumerForQueue != null)
            {
            consumerForQueue.close();
            }
            if(producerForQueue != null)
            {
            producerForQueue.close();
            }
            if(session != null)
            {
            session.close();
            }
            if(connection != null )
            {
            connection.close();
            }   
        } 
        catch (JMSException e) 
            {
            logger.info("Error while closing connection.",e);
            }
        finally
            {
            consumerForQueue = null;
            producerForQueue = null;
            destinationQueue = null;
            session = null;
            connection = null;
            activeMQUser = null;
            activeMQPassword = null;
            activeMQQueueName = null;
            activeMQURI = null;
            }
    }

    public boolean isConnected() {
        return isConnected;
    }

    public void setConnected(boolean isConnected) {
        this.isConnected = isConnected;
    }

    public boolean isQueueAvailable() {
        return isQueueAvailable;
    }

    public void setQueueAvailable(boolean isQueueAvailable) {
        this.isQueueAvailable = isQueueAvailable;
    }

    public boolean isProducerAvailable() {
        return isProducerAvailable;
    }

    public void setProducerAvailable(boolean isProducerAvailable) {
        this.isProducerAvailable = isProducerAvailable;
    }

    public MessageConsumer getConsumerForQueue() {
        return consumerForQueue;
    }

    public void setConsumerForQueue(MessageConsumer consumerForQueue) {
        this.consumerForQueue = consumerForQueue;
    }

    public boolean isConsumerAvailable() {
        return isConsumerAvailable;
    }

    public void setConsumerAvailable(boolean isConsumerAvailable) {
        this.isConsumerAvailable = isConsumerAvailable;
    }
}

上面是一个实用程序类

  • creating connectioncreating/connecting to queuesending messagereceiving message
  • 您可以使用其中的方法来发送或接收任何可序列化的 POJO。

【讨论】:

    猜你喜欢
    • 2013-09-08
    • 1970-01-01
    • 1970-01-01
    • 2015-01-25
    • 2011-07-24
    • 2011-07-30
    • 2018-06-04
    • 2017-01-10
    • 2013-01-19
    相关资源
    最近更新 更多