【问题标题】:DefaultMessageListenerContainer Not Reading Messages from IBM MQDefaultMessageListenerContainer 未从 IBM MQ 读取消息
【发布时间】:2017-06-15 19:56:49
【问题描述】:

我正在尝试创建一些使用 DefaultMessageListenerContainer 来监听来自 IBM MQ 的消息的 Spring Boot 代码。

我可以创建 MQQueueConnectionFactory 并使用 JmsTemplate 发送和接收消息,但这旨在实现高吞吐量,并且希望使用侦听器而不是轮询。

我已将大部分代码整合到一个组件中,所以我希望我拥有所有相关的内容。

如果我安排了 receiveMessage 方法,它会提取排队的消息,因此我知道 sendMessage 方法正在排队消息。

@Component
class AllInOneTest {

    private MessagingConfiguration.QueueConfig config;
    private MQQueueConnectionFactory           connectionFactory;
    private JmsTemplate                        jmsTemplate;
    private DefaultMessageListenerContainer    listenerContainer;
    private final Logger                       logger = LoggerFactory.getLogger(getClass());

    public AllInOneTest(MessagingManager manager) throws JMSException {
        String detailsName = "default";
        config = manager.getMessagingDetails(detailsName).getConfig();

        logger.debug("AllInOneTest Initializing Connection Factory: {}", detailsName);
        connectionFactory = new MQQueueConnectionFactory();
        connectionFactory.setHostName(config.getHost());
        connectionFactory.setPort(config.getPort());
        connectionFactory.setTransportType(config.getTransportType());
        connectionFactory.setQueueManager(config.getQueueManager());
        connectionFactory.setChannel(config.getChannel());

        logger.debug("AllInOneTest Initializing Message Listener: {}", detailsName);
        DefaultMessageListenerContainer defaultListener = new DefaultMessageListenerContainer();
        defaultListener.setConnectionFactory(connectionFactory);

        defaultListener.setExceptionListener((ee) -> {
            logger.warn(String.format("AllInOneTest Message Listener Error: %s", detailsName), ee);
        });

        defaultListener.setDestinationResolver((session, name, pubSub) -> {
            Destination ret = session.createQueue(name);

            logger.debug("AllInOneTest Created Listener Destination: {}", ret);

            return ret;
        });

        defaultListener.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                logger.info("AllInOneTest Listening For Message: {}", message);
            }
        });

        // TODO Configure subscription.
        // defaultListener.setSubscriptionDurable(true);
        // defaultListener.setSubscriptionName("masher-service");

        // TODO Configure concurrency.
        // defaultListener.setConcurrency(config.getConcurrency());

        // TODO Configure transaction.
        // defaultListener.setSessionTransacted(config.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE);

        listenerContainer = defaultListener;

        logger.debug("AllInOneTest Initializing JMS Template: {}", detailsName);
        jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setMessageConverter(new SpringToJMSMessageConverter());
        jmsTemplate.setReceiveTimeout(1000L);
        jmsTemplate.setDefaultDestinationName(config.getOutputQueue());
        jmsTemplate.setDestinationResolver((session, name, pubSub) -> {
            Destination ret = session.createQueue(name);

            logger.debug("AllInOneTest Created JMS Template Destination: {}", ret);

            return ret;
        });

        listenerContainer.setDestinationName(config.getOutputQueue());

        logger.debug("AllInOneTest Starting Message Listener: {} on {}", detailsName, config.getOutputQueue());
        listenerContainer.start();
    }

    // @Scheduled(fixedRate = 500L)
    public void receiveMessage() {
        Object message = jmsTemplate.receiveAndConvert();
        if (message != null) {
            logger.info("AllInOneTest Received: {}", message);
        }
    }

    @Scheduled(fixedRate = 1500L)
    public void sendMessage() {
        int count = counter.incrementAndGet();
        org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(String.format("JMS Masher Message %d %s %s", count,
                new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()), UUID.randomUUID().toString())).build();

        logger.info("AllInOneTest Sending: {} [{}]", message.getPayload(), message.getHeaders());

        jmsTemplate.convertAndSend(config.getInputQueue(), message);
    }

}

我正在调用 DefaultMessageListenerContainer.start(),但我感觉它不是“开始”,我一定是遗漏了一些东西。

JmsTemplate 调用 DestinationResolver,但 DefaultMessageListenerContainer 没有调用。

我在控制台中没有看到任何异常。

感谢您的帮助, 韦斯。

【问题讨论】:

    标签: java spring-boot ibm-mq spring-jms


    【解决方案1】:

    DefaultMessageListenerContainer defaultListener = new DefaultMessageListenerContainer();

    当您以编程方式创建容器时,而不是让 Spring 将其作为 @Bean 进行管理,您必须调用 afterPropertiesSet()(在您设置所有属性之后,在您 start() 之前)。

    对于许多 Spring 组件来说都是如此。通常让 Spring 管理它们会更好。

    【讨论】:

    • 啊。我在一些不相关的样本中看到了这一点。我有一个 @Bean@JmsListener 实现工作,但我可能需要根据配置配置一个或多个侦听器。我正在使用 Spring,所以我不需要编写程序代码,但要求并不总是有效。谢谢。
    • 没关系,明白了;但每当您自己实例化 Spring 提供的组件时,您应该始终检查它们是否实现了 InitializingBean,如果实现了,请调用 afterPropertiesSet()
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-08-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-02
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多