【问题标题】:Active MQ connection issue活动 MQ 连接问题
【发布时间】:2018-01-05 06:25:59
【问题描述】:

您好,我正在开发 wso2 esb 并使用 Active MQ 作为消息队列。

我有一个简单的服务来放置一条消息,它调用自定义 java 类在其中创建 tcp 连接并将消息放入队列中。

Java 代码如下所示

   package in.esb.custommediators;

import javax.jms.*; 

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.MessageContext; 
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.mediators.AbstractMediator;

import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.transport.nhttp.NhttpConstants;

import org.json.JSONObject;
import org.json.XML;

public class JMSStoreMediator extends AbstractMediator implements
ManagedLifecycle { 

    Connection connection;
    Session session;

    public boolean mediate(MessageContext msgCtx) { 

        log.info("LogLocation = "+getClass().getName()+",ProxyName = "+msgCtx.getProperty("proxy.name")+
                ",Usercode = "+msgCtx.getProperty("usercode")+",Clientid = "+msgCtx.getProperty("clientid")+
                ",requestMsgId = "+msgCtx.getProperty("requestMsgId")+",Position = START"); 


         try {
             boolean topic=false;
            String jmsuri=""+msgCtx.getProperty("jmsuri");
            String t=""+msgCtx.getProperty("topic");
            if(t.isEmpty()){
                topic=false;
            }
            else {
                topic=Boolean.valueOf(t);
            }
            ConnectionFactory factory= new ActiveMQConnectionFactory(jmsuri);
            connection = factory.createConnection();
                connection.start();

            log.info("LogLocation = "+getClass().getName()+",JMS connection created :"+connection);
            this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination=null;
            if(!topic)destination= session.createQueue(""+msgCtx.getProperty("jmsqueue"));
            else destination= session.createTopic(""+msgCtx.getProperty("jmsqueue"));
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);

            String xml = ""+msgCtx.getEnvelope().getBody().toStringWithConsume();

            if(topic){

                JSONObject obj=XML.toJSONObject(xml);
                JSONObject ar=obj.getJSONObject("soapenv:Body");
                ar.remove("xmlns:soapenv");
                xml=ar.toString();
            }
            TextMessage message = session.createTextMessage(xml);
            producer.send(message);


        } catch (Exception e) {

            log.info("LogLocation = "+getClass().getName()+",Error in storing message in JMS stacktrace is :"+e.toString()+"message is :"+e.getMessage());
            e.printStackTrace();

            ((Axis2MessageContext) msgCtx).setProperty(NhttpConstants.HTTP_SC, 500);
            handleException("Error while storing in the message store", msgCtx);

        }
        finally {
            try {
                session.close();
                if (connection!=null){
                    log.info("LogLocation = "+getClass().getName()+",JMS connection closing :"+connection);
                    connection.close();
                }

            } catch (JMSException e) {
                log.info("LogLocation = "+getClass().getName()+",Error in closing JMS connection stacktrace is :"+e.toString());
                e.printStackTrace();
            }
        }

        return true;
    }

    @Override
    public void destroy() {
        // TODO Auto-generated method stub

    }

    @Override
    public void init(SynapseEnvironment arg0) {
        // TODO Auto-generated method stub

    }

}

当我调用此服务以在队列中发送消息时,生成日志。

[2017-07-29 11:18:35,962]  INFO - JMSStoreMediator LogLocation = in.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-3:1,clientId=ID:my-desktop-36442-1501307315570-2:1,started=true}

目前一切正常,但是当两个用户尝试在同一个轮胎上提交消息时,会发生一些奇怪的事情,如下所示

[2017-07-29 11:43:11,948]  INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-11:1,clientId=ID:my-desktop-36442-1501307315570-10:1,started=false}
[2017-07-29 11:43:11,963]  INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-11:1,clientId=ID:my-desktop-36442-1501307315570-10:1,started=true}

[2017-07-29 11:43:12,068]  INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,Error in closing JMS connection stacktrace is :org.apache.activemq.ConnectionClosedException: The connection is already closed

Active MQ 正在创建两个连接,但对两个调用都使用一个连接,并且一个连接在其中一个服务调用中关闭,并在另一个服务调用中抛出已经关闭的错误,而另一个连接在连接中永远等待活动状态为 true 的活动 mq 列表如下图所示,这也可以在 ESB 线程列表中看到。

这种连接堆积并导致 ESB 服务器挂起。即使我从 Active MQ ESB 线程重置此连接也会携带此连接信息,并且只有在 ESB 重新启动后问题才会得到解决。

【问题讨论】:

  • 连接变量的初始化在哪里?看起来对连接的引用在另一个线程中可用
  • 这是自定义中介实现吗?你能提供完整的课程代码吗?很明显这里有一些多线程问题
  • 嗨@simar 是的,我正在使用自定义调解器,我已经使用正在使用的完整自定义类编辑了我的问题,正如你所说,它有一些多线程问题。

标签: java wso2 activemq wso2esb wso2carbon


【解决方案1】:

你读过文章Extending the Functionality of WSO2 Enterprise Service Bus - Part 1 吗?

重要的部分是线程安全。它指出,每个中介,包括自定义,在传入消息之间共享。我建议移动类变量

Connection connection;
Session session;

到方法 public boolean mediate(MessageContext msgCtx) 因为局部变量是线程安全的

public class JMSStoreMediator extends AbstractMediator implements
ManagedLifecycle {     

    public boolean mediate(MessageContext msgCtx) { 
             Connection connection;
             Session session;
    ....
    ....
    rest the same

【讨论】:

  • 根据您的建议进行了更改并进行了检查,但 dint 解决了同样的问题。我会仔细阅读这篇文章。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-01-02
  • 2014-08-30
  • 1970-01-01
  • 1970-01-01
  • 2012-11-12
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多