【问题标题】:ActiveMQ Consumer / Producer Connection ListenerActiveMQ 消费者/生产者连接监听器
【发布时间】:2010-12-14 17:40:58
【问题描述】:

我似乎找不到在 ActiveMQ(Java 版本)中侦听新的生产者和消费者连接(或连接中断)的方法。我希望能够告诉消费者(或者他们可以自己发现)生产者的连接断开了。反过来(生产者发现某个消费者断开连接)也是必需的。

我将不胜感激。

【问题讨论】:

    标签: java jms activemq


    【解决方案1】:

    我需要的所有信息都发布在 ActiveMQ 咨询主题中,例如“ActiveMQ.Advisory.Connection”或简单的“ActiveMQ.Advisory..>”。甚至在 Stomp Connection 中发生的事件也会发布在 ActiveMQ 咨询主题中。以下代码给出了一个示例(使用通过 Stomp 连接的 Flex 客户端进行测试):

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("user", "password", ActiveMQConnection.DEFAULT_BROKER_URL);
    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(transacted, ackMode);
    connection.start();
    Destination destinationAdvisory = session.createTopic("ActiveMQ.Advisory..>");
    MessageConsumer consumerAdvisory = session.createConsumer(destinationAdvisory);
    consumerAdvisory.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
    
                if (message instanceof ActiveMQMessage) {
                    ActiveMQMessage activeMessage = (ActiveMQMessage) message;
                    Object command = activeMessage.getDataStructure();
                    if (command instanceof ConsumerInfo) {
                        System.out.println("A consumer subscribed to a topic or queue: " + command);
                    } else if (command instanceof RemoveInfo) {
                        RemoveInfo removeInfo = (RemoveInfo) command;
                        if (removeInfo.isConsumerRemove()) {
                            System.out.println("A consumer unsubscribed from a topic or queue");
                        }
                        else {
                            System.out.println("RemoveInfo, a connection was closed: " + command);
                        }
                    } else if (command instanceof ConnectionInfo) {
                        System.out.println("ConnectionInfo, a new connection was made: " + command);
                    } else {
                        System.out.println("Unknown command: " + command);
                    }
                }
        }
    });
    

    【讨论】:

      【解决方案2】:

      我认为您想在特定目的地(特定队列或主题)上监听新的生产者和消费者。对吗?

      您可以实例化 ConsumerEventSource 和 ProducerEventSource,并通过分别调用它们的 setConsumerListener 和 setProducerListener 来注册自己的监听器。

      所以:

      Connection conn = yourconnection; // the connection your listener will use
      Destination dest = yourdestination; // the destination you're paying attention to
      ConsumerEventSource source = new ConsumerEventSource(conn, dest);
      source.setConsumerListener(new ConsumerListener() {
      
         public void onConsumerEvent(ConsumerEvent event) {
            if (event.isStarted()) {
               System.out.println("a new consumer has started - " + event.getConsumerId());
            } else {
               System.out.println("a consumer has dropped - " + event.getConsumerId());
            }
         }
      
      });
      

      如果您查看 ConsumerEventSource 或 ProducerEventSource 的代码,您会发现它们是简单的对象,它们使用 AdvisorySupport 的方法来监听一个特殊的咨询主题,该主题的业务是广播有关生产者和消费者的新闻。您可以通过阅读这些类的源代码来了解更多信息。

      您对“连接”的使用可能存在问题;在 ActiveMQ 领域(它是 JMS 领域的一个子集)中,“连接”是与特定目的地无关的较低级别的对象。一个特定的客户端从一个连接创建一个“会话”——仍然不是特定于一个目标——然后创建一个特定于目标的 QueueSender、QueueReceiver、TopicPublisher 或 TopicSubscriber。当这些事件被创建时,或者当创建它们的会话终止时,这些就是您想听到的事件,如果您使用上面的代码,就会听到这些事件。

      【讨论】:

      • 非常感谢!这正是我的意思。我还尝试将此代码应用于 Stomp 连接(failover://stomp://localhost:61613),但不幸的是它不起作用。我认为这是因为通过 Stomp 订阅 (issues.apache.org/activemq/browse/AMQ-2098) 收到的咨询消息是空的。但是,如果您知道任何解决方法,请告诉我。
      猜你喜欢
      • 2015-07-23
      • 2011-03-12
      • 2012-01-10
      • 2012-08-04
      • 1970-01-01
      • 2020-06-17
      • 2010-10-19
      • 2016-04-23
      • 2021-03-06
      相关资源
      最近更新 更多