【问题标题】:Spring Boot + ActiveMQ programmatically subscribe to topics on the flySpring Boot + ActiveMQ 以编程方式即时订阅主题
【发布时间】:2018-10-24 02:46:33
【问题描述】:

我正在尝试实现一个功能,让我拥有的侦听器类可以订阅/取消订阅 JMS 主题。 经过一些研究,没有明确的方法可以做到这一点,我想出了两个解决方案:

  1. 有一个监听器类,它包含一个字符串主题名称列表,并定期运行它应该订阅的所有这些主题,并在每个主题上运行阻塞jmsTemplate.receiveAndConvert(topicName)(可能将阻塞操作本身委托给工作池)。 订阅/取消订阅主题就像从列表中删除主题名称一样简单。
  2. 有一个工厂类,它将为应用程序需要订阅的每个主题构建一个新的侦听器,使用如下方法:

    public MessageListenerContainer createListener(String topic) {
      DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
      container.setConnectionFactory(connectionFactory());
      container.setDestinationName(topic);
      container.setMessageListener(new MyListenerClass());
      return container;
    

    }

第二个选项对我来说似乎更优雅,但我不确定听众的生命周期。我浏览了一些 Spring Boot 的 jms 和 activemq 模块的源代码,并注意到 DefaultMessageListenerContainer 有方法 initialize()start() 尽管我不确定如何/是否需要使用它们,这是我唯一可以使用的方法找到以这种方式构建的MessageListenerContainer 是作为Bean 声明。 此外,当取消订阅某个主题,因此想要销毁与其关联的侦听器容器时,除了调用stop(callback) 方法之外,是否需要做更多的事情?

我对 JMS/ActiveMQ 及其 Spring 集成的理解是否正确,因为没有更简单的方法可以实现这一点?我的方法正确吗?

【问题讨论】:

    标签: java spring spring-boot activemq spring-jms


    【解决方案1】:

    恕我直言,只要你

    • 从spring获取connectionFactory(不是一个PooledConnectionFactory一个)
    • 在订阅时正确调用initialise()start()stop() 取消订阅
    • 不要期望在异常情况下重新传递消息

    第二种方法应该没问题

    【讨论】:

    • 请注意,您可以在此处找到示例:stackoverflow.com/a/28327558/2087640
    • 您还应该在stop() 之后调用shutDown() 以完全关闭一切(这与initialize() 正好相反)。
    • 或者在@PreDestroy方法中,是否可以在应用生命周期内重新连接
    • 感谢您的信息和链接,想知道为什么过去一天我一直无法解决这个问题
    【解决方案2】:

    要在运行时注册新的JmsListenerEndpoint,您必须完成 2 个步骤

    1 创建自定义MessageListenerservice

    @Service
    public class CustomMessageListener implements MessageListener {
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("[Custom message listener] " + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    

    2 注册新端点,使用JmsListenerEndpointRegistry

    @Service
    public class MessageListenersService {
        @Autowired
        private JmsListenerEndpointRegistry registry;
    
        @Autowired
        @Qualifier("containerFactory")
        private DefaultJmsListenerContainerFactory factory;
    
        public void registerListener(String queueNameToListen, MessageListener listener) {
            SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
            endpoint.setId("ep-"+listener.hashCode()); // ID is mandatory
            endpoint.setMessageListener(listener);
            endpoint.setDestination(queueNameToListen);
            registry.registerListenerContainer(endpoint, factory, true);
        }
    }
    

    使用它

        private static final String CUSTOM_DESTINATION = "queue-42";
    
        @Autowired
        MessageListenersService messageListenersService;
        @Autowired
        CustomMessageListener customMessageListener;
        @Autowired
        JmsTemplate jmsTemplate;
    
        @PostConstruct
        public void createCustomListener() throws InterruptedException {
            messageListenersService.registerListener(CUSTOM_DESTINATION, customMessageListener);
            jmsTemplate.send(CUSTOM_DESTINATION, session -> session.createTextMessage("hello world"));
    
            // wait your message:
            TimeUnit.SECONDS.sleep(1);
        }
    

    【讨论】:

      猜你喜欢
      • 2018-01-15
      • 2014-01-21
      • 2021-03-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-06-16
      • 2020-08-12
      • 1970-01-01
      相关资源
      最近更新 更多