【问题标题】:How to recover from com.ibm.mq.MQException: MQJE001: Completion Code '2', Reason '2009'如何从 com.ibm.mq.MQException 中恢复:MQJE001:完成代码“2”,原因“2009”
【发布时间】:2021-06-08 18:20:21
【问题描述】:

我正在编写一个消费者来读取来自 MQ 的分段消息。我在其他队列中使用 Spring JMS / Spring Integration。我了解 IBM MQ 不支持 JMS 中的消息分段: (相关问题在这里。 How to assemble segments of MQ messages in Spring integration)

这是我想出的将 IBM MQ 类用于 java 和 Spring 的方法。

MQ 对象的 Bean 定义。

@Bean
    public MQGetMessageOptions mqGetMessageOptions() {
        MQGetMessageOptions getOptions = new MQGetMessageOptions();
        getOptions.waitInterval = CMQC.MQWI_UNLIMITED;
        getOptions.options = CMQC.MQGMO_WAIT + CMQC.MQGMO_ALL_SEGMENTS_AVAILABLE + CMQC.MQGMO_LOGICAL_ORDER
                + CMQC.MQGMO_COMPLETE_MSG;
        return getOptions;
    }

    @Bean
    public MQQueueManager mqQueueManager() throws Exception {
        Hashtable<String, Object> properties = new Hashtable<String, Object>();
        properties.put(CMQC.CHANNEL_PROPERTY, channel);
        properties.put(CMQC.HOST_NAME_PROPERTY, hostName);
        properties.put(CMQC.PORT_PROPERTY, new Integer(port));
        MQQueueManager qMgr = new MQQueueManager(queueManager, properties);
        return qMgr;
    }

    @Bean
    public MQQueue inboundQueue(@Autowired MQQueueManager mqQueueManager) throws Exception {
        int openOptions = CMQC.MQOO_INPUT_EXCLUSIVE;
        MQQueue inboundQueue = mqQueueManager.accessQueue(inboundQueue, openOptions);
        return inboundQueue;
    }

    @Bean
    public MessageChannel queueConsumerChannel() {
        // return new DirectChannel();
        return new ExecutorChannel(Executors.newFixedThreadPool(5));
    }

消费者代码:

@Component
@Slf4j
public class MyQueueConsumer {

    @Autowired
    MQQueueManager qMgr;

    @Autowired
    MQGetMessageOptions mqGetMessageOptions;

    @Autowired
    MQQueue inboundQueue;

    @Autowired
    MessageChannel queueConsumerChannel;

    @Autowired
    MessageSaveService messageSaveService;


    @EventListener(ApplicationReadyEvent.class)
    public void consume() {
        boolean getMore = true;
        MQMessage receiveMsg = null;
        while (getMore) {
            try {
                receiveMsg = new MQMessage();
                log.info("Waiting to consume mesages from ....");
                inboundQueue.get(receiveMsg, mqGetMessageOptions);
                byte[] b = new byte[receiveMsg.getMessageLength()];
                receiveMsg.readFully(b);
                String fileName = getFileName();
                Message<String> outMessage = MessageBuilder.withPayload(new String(b)).build();
                queueConsumerChannel.send(outMessage);
                log.info("Message consumed and sent to processng channel");
                // qMgr.commit();
            } catch (MQException e) {
                if ((e.completionCode == CMQC.MQCC_WARNING) && (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE)) {
                    log.error("Bottom of the queue reached.");

                    getMore = false;
                } else {
                    log.error("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode + " : EC=" + e.getErrorCode());
                    log.info("Is Connected :" + qMgr.isConnected());
                    log.info("Is open : " + qMgr.isOpen());
                    e.printStackTrace();
                    getMore = false;
                }
            } 
        }

    }

    @PreDestroy
    public void closeMQObjects() {
        System.out.println("Closing MQ objects ");
        try {
            if (inboundQueue != null)
                inboundQueue.close();
        } catch (MQException e) {
            System.err.println("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode);
            e.printStackTrace();
        }
        try {
            if (qMgr != null)
                qMgr.disconnect();
        } catch (MQException e) {
            System.err.println("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode);
            e.printStackTrace();
        }
    }
}

使用此配置,Consumer 按需要工作,它组装所有分段消息并作为一条完整消息读取,并在队列中等待下一条消息到达。 但我面临的挑战是我明白了

com.ibm.mq.MQException: MQJE001: Completion Code '2', Reason '2009'

每隔一段时间。我将标志设为假以退出 while 循环。 至今无法弄清楚这个异常的确切原因。 我如何从这个异常中恢复并继续等待队列并在消息到达时使用它们? 我使用 Spring 和 IBM MQ 所采取的方法是否存在任何缺陷

【问题讨论】:

  • 请看这里:ibm.com/support/pages/…。看起来问题与 Spring 完全无关......
  • 我同意.. 它与 Spring 无关。
  • 您的 SVRCONN HBINT 设置为什么?如果您不知道,可以使用 IBM MQ classes for Java trace 找到它。您的连接是否通过了连接超时的网络设备(例如防火墙或负载平衡器)?
  • 这个问题上的 cmets 提供了跟踪详细信息以及要查找的内容。 stackoverflow.com/questions/64284625/ibm-mq-session-disconnects
  • SVRCONN HBINT 设置为默认 HBINT(300) 。不确定这是否可以在客户端代码上设置。我没有在客户端设置值。我能够连接和阅读消息。在等待队列接收消息一段时间后,我看到了这个异常

标签: spring spring-integration ibm-mq mq


【解决方案1】:

不要将inboundQueue定义为一个bean,而是在while循环中分配它,当你成功结束循环或遇到可重试的异常时分配close它。

很遗憾,我不能给你一个可重试异常的列表,但2009 绝对是其中之一。

延迟重试是个好习惯,例如参见ExponentialBackOff

【讨论】:

    猜你喜欢
    • 2021-05-06
    • 1970-01-01
    • 2014-03-07
    • 1970-01-01
    • 1970-01-01
    • 2011-06-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多