【问题标题】:ActiveMQ:Non persistent Topic Consumer doesnt close sometimesActiveMQ:非持久主题消费者有时不会关闭
【发布时间】:2011-09-15 23:59:17
【问题描述】:

我正在使用 Apache ActiveMQ 5.5 并且有以下场景

-嵌入式代理

-NonPersistent Producer 和 Topic

-主题的消费者。

正常工作 - 我发布到一个主题,订阅者从中消费。

我已经实现了一个 MessageListener,所以当消费者订阅和取消订阅时,我会打印一些东西来表明这一点。 当我必须关闭消费者时,我只需调用它的 close 方法。

有时消费者会成功关闭 - 我看到日志和内存使用情况很好。

但有时它不会关闭,虽然我可以在日志中看到我调用了 close 方法。 这次在日志中 MessageListener 没有提到订阅者是如何取消订阅的。 结果,内存使用量上升了,因为现在发布者正在向主题发送消息,而我已经关闭了消费者(实际上并没有关闭) 并停止处理消息。

所以我不确定在哪里以及如何解决这个问题...... 我认为这与 asynch activemq 工作线程及其行为有关。

以下是我正在使用的与 ActiveMQ 相关的所有类。如果我应该添加任何其他代码,请告诉我。

            public class Consumer {

              private MessageConsumer consumer;

              public Consumer(String brokerUrl, String topic, MessageListener ml) throws JMSException {

                ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);

                //with straight through processing of messages
                //and optimized acknowledgement
                cf.setAlwaysSessionAsync(false);
                cf.setOptimizeAcknowledge(true);
                Connection connection = cf.createConnection();
                connection.start();

                //-- Use the default session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                //-- Set the prefetch size for topics - by parsing a configuration parameter in
                // the name of the topic
                //-- topic=test.topic?consumer.prefetchSize=32766
                Topic topicObj = session.createTopic(topic);

                consumer = session.createConsumer(topicObj);

                //-- Attach the passed in message listener
                consumer.setMessageListener(ml);

              }

              /**
               * @return the consumer
               */
              public MessageConsumer getConsumer() {
                return consumer;
              }
            }   


            public class ConsumerAdvisoryListener implements MessageListener {

              private XMLLogUtil xlu;
              private MyLogger ml;

              public ConsumerAdvisoryListener() throws IOException{
                xlu=XMLLogUtil.getInstance();
                ml=xlu.getCustomLogger(ConsumerAdvisoryListener.class);
              }

              public void onMessage(Message message) {
                ActiveMQMessage msg = (ActiveMQMessage) message;
                DataStructure ds = msg.getDataStructure();
                if (ds != null) {
                  switch (ds.getDataStructureType()) {
                  case CommandTypes.CONSUMER_INFO:
                    ConsumerInfo consumerInfo = (ConsumerInfo) ds;
                    ml.info("Consumer '" + consumerInfo.getConsumerId()
                        + "' subscribed to '" + consumerInfo.getDestination()
                        + "'");
                    break;
                  case CommandTypes.REMOVE_INFO:
                    RemoveInfo removeInfo = (RemoveInfo) ds;
                    ConsumerId consumerId = ((ConsumerId) removeInfo.getObjectId());
                    ml.info("Consumer '" + consumerId + "' unsubscribed");
                    break;
                  default:
                    ml.info("Unkown data structure type");
                  }
                } else {
                  ml.info("No data structure provided");
                }
              }
            }


            public class EmbeddedBroker {

              /**
               * Singleton
               */
              private static EmbeddedBroker INSTANCE;
              private BrokerService broker; 

              /**
               * Return singleton instance
               * @return
               */
              public static EmbeddedBroker getInstance(){
                if(EmbeddedBroker.INSTANCE ==null){
                  throw new IllegalStateException("Not Initialized");
                }

                return INSTANCE;
              }

              /**
               * Initialize singleton instance.
               * 
               * @return
               * @throws Exception 
               */
              public static EmbeddedBroker initialize(String brokerName) throws Exception{
                if(EmbeddedBroker.INSTANCE ==null){
                  EmbeddedBroker.INSTANCE=new EmbeddedBroker(brokerName, false);
                }
                else{
                  throw new IllegalStateException("Already Initialized");
                }

                return INSTANCE;
              }

              /**
               * Initialize singleton instance.
               * 
               * @return
               * @throws Exception 
               */
              public static EmbeddedBroker initialize(String brokerName, boolean enableTCPConnector) throws Exception{
                if(EmbeddedBroker.INSTANCE ==null){
                  EmbeddedBroker.INSTANCE=new EmbeddedBroker(brokerName, enableTCPConnector);
                }
                else{
                  throw new IllegalStateException("Already Initialized");
                }

                return INSTANCE;
              }

              /**
               * Private constructor
               * @throws Exception 
               */
              private EmbeddedBroker(String brokerName, boolean enableTCPConnector) throws Exception{

                //-- By default a broker always listens on vm://<broker name>
                this.broker = new BrokerService();
                this.broker.setBrokerName(brokerName);

                //-- Enable Advisory Support.  Its true by default, but this is to explicitly mention it for documentation purposes
                this.broker.setAdvisorySupport(true);

                /* Create non-persistent broker to use inMemory Store,
                 * instead of KAHA or any other persistent store.
                 * See Section 4.6.1 of ActiveMQInAction  
                 */
                this.broker.setPersistent(false);

                //-- 64 MB
                this.broker.getSystemUsage().getMemoryUsage().setLimit(64*1024*1024);

                //-- Set the Destination policies
                PolicyEntry policy = new PolicyEntry();

                //-- Set a memory limit of 4mb for each destination
                policy.setMemoryLimit(4 * 1024 *1024);

                //-- Disable flow control
                policy.setProducerFlowControl(false);
                PolicyMap pMap = new PolicyMap();

                //-- Configure the policy
                pMap.setDefaultEntry(policy);
                this.broker.setDestinationPolicy(pMap);

                if(enableTCPConnector)
                  broker.addConnector("tcp://localhost:61616");

                //-- Start the Broker.
                this.broker.start();

              }

            }




            public class NonPersistentProducer {

              private final MessageProducer producer;
              private final Session session;

              public NonPersistentProducer(String brokerUrl, String topic) throws JMSException{
                //-- Tell the connection factory to connect to a broker and topic passed in.
                ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);

                //-- Disable message copying
                cf.setCopyMessageOnSend(false);

                Connection connection = cf.createConnection();
                connection.start();

                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                Topic topicObj = session.createTopic(topic);

                producer = session.createProducer(topicObj);

                //-- Send non-persistent messages
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

              }//-- Producer

              /**
               * @return the producer
               */
              public MessageProducer getProducer() {
                return producer;
              }

              /**
               * @return the session
               */
              public Session getSession() {
                return session;
              }



            }

【问题讨论】:

    标签: java activemq


    【解决方案1】:

    我想通了。关闭(阅读文档)方法会阻塞,直到接收或侦听器完成。在我的情况下,我的听众由于异常而迷路了。只要监听或接收成功完成,关闭就会完成,消费者会被移除。

    【讨论】:

      猜你喜欢
      • 2017-02-20
      • 1970-01-01
      • 2014-05-21
      • 1970-01-01
      • 1970-01-01
      • 2017-02-14
      • 2015-06-16
      • 2011-07-13
      • 2017-09-05
      相关资源
      最近更新 更多