【问题标题】:ActiveMQ Transport: tcp: Thread RUNNABLE state - too many threads hangingActiveMQ Transport: tcp: Thread RUNNABLE state - 挂起的线程太多
【发布时间】:2016-11-04 00:39:26
【问题描述】:

以下 ActiveMQ 实现存在于代码中。有时,系统停止工作并变得非常缓慢。当我使用 JavaMelody 检查线程转储时 - 我看到太多线程长时间处于可运行状态并且没有被终止。

ActiveMQ 版本 - activemq-all-5.3.0.jar

请参考以下代码:

经纪人:

public class ActiveMQ extends HttpServlet {

private static final long serialVersionUID = -1234568008764323456;
private static final Logger logger = Logger.getLogger(ActiveMQ.class.getName());
public Listener listener;

private String msgBrokerUrl = "tcp://localhost:61602";
public BrokerService broker = null;
public TransportConnector connector = null;

@Override
public void init() throws ServletException {

    try {
        broker = new BrokerService();
        broker.setPersistent(false);
        broker.setUseJmx(false);
        connector = broker.addConnector(msgBrokerUrl);
        broker.setUseShutdownHook(true);
        System.out.println("BROKER LOADED");
        broker.start();
        broker.deleteAllMessages();

        listener = new Listener();

    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

听众:

public class Listener implements MessageListener {

private String msgQueueName = "jms/queue/MessageQueue";
public Session session;
public Destination adminQueue;

public static String id;

public ActiveMQConnection connection;
public MessageConsumer consumer = null;

public Listener() {
    try {

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                new URI("failover://(" + "tcp://localhost:61602" + "?wireFormat.cacheEnabled=false"
                        + "&wireFormat.maxInactivityDuration=0&wireFormat.tightEncodingEnabled=true)?maxReconnectDelay=1000"));

        connection = (ActiveMQConnection) connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        adminQueue = session.createQueue(msgQueueName);
        id = new Timestamp(new Date().getTime()).toString();
        consumer = this.session.createConsumer(this.adminQueue, "ID='" + id + "'");
        consumer.setMessageListener(this);
    } catch (JMSException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@SuppressWarnings("unchecked")
@Override
public void onMessage(Message message) {
    TextMessage msg = (TextMessage) message;
    try {
        String xmlMsg = msg.getText();
        // business logic
    } catch (JMSException ex) {
        ex.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

制片人:

public class Producer {
private static String url = "tcp://localhost:61602";
private static String msgQueueName = "jms/queue/MessageQueue";

public ConnectionFactory connectionFactory = null;
public Connection connection = null;
public Session session = null;
public Destination destination = null;

public Producer() {
    connectionFactory = new ActiveMQConnectionFactory(url);
}

public void sendResponse(String xml, DataBean objDataBean) {
    sendToQueue(xml, msgQueueName, objDataBean);
}

private void sendToQueue(String xml, String msgQueueName, DataBean obj) {

    try {
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(msgQueueName);
        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage(xml);
        message.setJMSExpiration(1000);
        message.setStringProperty(obj.getMsgKey(), obj.getMsgValue());
        producer.send(message);

        xml = null;
        session.close();
        connection.close();

    } catch (Exception e) {
        e.printStackTrace();
    }
}

public static void main(String[] args) {

    for (int msg = 0; msg < 20; msg++) {
        DataBean obj = getData();
        new Producer().sendResponse(xml, obj);
        ;
    }
}

}

挂起线程异常详情:

类型 1:

ActiveMQ Transport: tcp:///127.0.0.1:41818
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
java.io.DataInputStream.readInt(DataInputStream.java:387)
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:272)
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
java.lang.Thread.run(Thread.java:745)

类型 2:

ActiveMQ Transport: tcp://localhost/127.0.0.1:61602
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
java.io.DataInputStream.readInt(DataInputStream.java:387)
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:272)
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
java.lang.Thread.run(Thread.java:745)

请您就这个问题提供一些提示以供进一步调查。

编辑: 我在互联网上阅读了几篇文章并得出结论,我必须更新 activemq jar 文件并实现超时,但是当我开始阅读有关超时设置的内容时,我很困惑是否应该在生产者和消费者或故障转移或消息或代理服务中设置超时。每个组件的超时都有不同的目的,那么考虑到上面的代码和异常,我应该在哪里实现超时。

【问题讨论】:

  • 你所描述的一切都是正常的。如果你有太多的 RUNNABLE 这意味着你有太多的线程来满足你所拥有的 CPU 能力。注意:让线程在socketRead0 上等待意味着该线程没有任何事情要做(它正在等待另一个发送写东西)
  • 为什么生成了太多的端口号——我的意思是每个线程都有一个唯一的端口号。
  • tcp://localhost/127.0.0.1:61602 - 这又是一个问题。
  • 根据 TCP/IP 规范,每个 Socket 都有自己的端口号。

标签: java jms activemq message-queue producer-consumer


【解决方案1】:

创建连接非常昂贵,当您关闭它时,端口会保留长达 3 分钟,以确保它完全关闭。

只有在确实必须避免性能问题时才想创建连接。我建议您创建一次连接,并保持该连接打开,除非您遇到错误。这可以将性能提高 2 到 3 个数量级。

这是一种很好的性能调优模式,适用于很多情况;

  • 仅在真正需要时才创建和销毁昂贵的资源。
  • 您多次执行的操作应保持在最低限度。即尽可能少地重复。

【讨论】:

  • 根据上面的代码,监听器连接只创建一次,它会在 onMessage 方法被连续调用的地方继续运行,但每次从 main 方法运行循环都会创建生产者连接。生产者这是我必须只创建一次连接的问题..请告知。
猜你喜欢
  • 1970-01-01
  • 2013-02-24
  • 2019-06-02
  • 2019-07-08
  • 2022-01-13
  • 2018-09-02
  • 1970-01-01
  • 2013-07-02
  • 2011-04-19
相关资源
最近更新 更多