【发布时间】:2021-06-08 18:20:21
【问题描述】:
我正在编写一个消费者来读取来自 MQ 的分段消息。我在其他队列中使用 Spring JMS / Spring Integration。我了解 IBM MQ 不支持 JMS 中的消息分段: (相关问题在这里。 How to assemble segments of MQ messages in Spring integration)
这是我想出的将 IBM MQ 类用于 java 和 Spring 的方法。
MQ 对象的 Bean 定义。
@Bean
public MQGetMessageOptions mqGetMessageOptions() {
MQGetMessageOptions getOptions = new MQGetMessageOptions();
getOptions.waitInterval = CMQC.MQWI_UNLIMITED;
getOptions.options = CMQC.MQGMO_WAIT + CMQC.MQGMO_ALL_SEGMENTS_AVAILABLE + CMQC.MQGMO_LOGICAL_ORDER
+ CMQC.MQGMO_COMPLETE_MSG;
return getOptions;
}
@Bean
public MQQueueManager mqQueueManager() throws Exception {
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put(CMQC.CHANNEL_PROPERTY, channel);
properties.put(CMQC.HOST_NAME_PROPERTY, hostName);
properties.put(CMQC.PORT_PROPERTY, new Integer(port));
MQQueueManager qMgr = new MQQueueManager(queueManager, properties);
return qMgr;
}
@Bean
public MQQueue inboundQueue(@Autowired MQQueueManager mqQueueManager) throws Exception {
int openOptions = CMQC.MQOO_INPUT_EXCLUSIVE;
MQQueue inboundQueue = mqQueueManager.accessQueue(inboundQueue, openOptions);
return inboundQueue;
}
@Bean
public MessageChannel queueConsumerChannel() {
// return new DirectChannel();
return new ExecutorChannel(Executors.newFixedThreadPool(5));
}
消费者代码:
@Component
@Slf4j
public class MyQueueConsumer {
@Autowired
MQQueueManager qMgr;
@Autowired
MQGetMessageOptions mqGetMessageOptions;
@Autowired
MQQueue inboundQueue;
@Autowired
MessageChannel queueConsumerChannel;
@Autowired
MessageSaveService messageSaveService;
@EventListener(ApplicationReadyEvent.class)
public void consume() {
boolean getMore = true;
MQMessage receiveMsg = null;
while (getMore) {
try {
receiveMsg = new MQMessage();
log.info("Waiting to consume mesages from ....");
inboundQueue.get(receiveMsg, mqGetMessageOptions);
byte[] b = new byte[receiveMsg.getMessageLength()];
receiveMsg.readFully(b);
String fileName = getFileName();
Message<String> outMessage = MessageBuilder.withPayload(new String(b)).build();
queueConsumerChannel.send(outMessage);
log.info("Message consumed and sent to processng channel");
// qMgr.commit();
} catch (MQException e) {
if ((e.completionCode == CMQC.MQCC_WARNING) && (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE)) {
log.error("Bottom of the queue reached.");
getMore = false;
} else {
log.error("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode + " : EC=" + e.getErrorCode());
log.info("Is Connected :" + qMgr.isConnected());
log.info("Is open : " + qMgr.isOpen());
e.printStackTrace();
getMore = false;
}
}
}
}
@PreDestroy
public void closeMQObjects() {
System.out.println("Closing MQ objects ");
try {
if (inboundQueue != null)
inboundQueue.close();
} catch (MQException e) {
System.err.println("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode);
e.printStackTrace();
}
try {
if (qMgr != null)
qMgr.disconnect();
} catch (MQException e) {
System.err.println("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode);
e.printStackTrace();
}
}
}
使用此配置,Consumer 按需要工作,它组装所有分段消息并作为一条完整消息读取,并在队列中等待下一条消息到达。 但我面临的挑战是我明白了
com.ibm.mq.MQException: MQJE001: Completion Code '2', Reason '2009'
每隔一段时间。我将标志设为假以退出 while 循环。 至今无法弄清楚这个异常的确切原因。 我如何从这个异常中恢复并继续等待队列并在消息到达时使用它们? 我使用 Spring 和 IBM MQ 所采取的方法是否存在任何缺陷
【问题讨论】:
-
请看这里:ibm.com/support/pages/…。看起来问题与 Spring 完全无关......
-
我同意.. 它与 Spring 无关。
-
您的 SVRCONN HBINT 设置为什么?如果您不知道,可以使用 IBM MQ classes for Java trace 找到它。您的连接是否通过了连接超时的网络设备(例如防火墙或负载平衡器)?
-
这个问题上的 cmets 提供了跟踪详细信息以及要查找的内容。 stackoverflow.com/questions/64284625/ibm-mq-session-disconnects
-
SVRCONN HBINT 设置为默认 HBINT(300) 。不确定这是否可以在客户端代码上设置。我没有在客户端设置值。我能够连接和阅读消息。在等待队列接收消息一段时间后,我看到了这个异常
标签: spring spring-integration ibm-mq mq