【问题标题】:Spring Batch - Not all records are being processed from MQ retrievalSpring Batch - 并非所有记录都从 MQ 检索中处理
【发布时间】:2015-08-11 15:24:06
【问题描述】:

我对 Spring 和 Spring Batch 相当陌生,所以如果您有任何澄清问题,请随时提出。

我发现 Spring Batch 存在无法在我们的测试或本地环境中重新创建的问题。我们的日常工作是通过 JMS 连接到 Websphere MQ 并检索一组记录。此作业使用开箱即用的 JMS ItemReader。我们实现了自己的 ItemProcessor,但它除了记录之外没有做任何特别的事情。没有应该影响传入记录的过滤器或处理。

问题在于,在 MQ 上的 10,000 多条日常记录中,通常只有大约 700 条左右(每次确切数字不同)记录在 ItemProcessor 中。所有记录都成功地从队列中拉出。每次记录的记录数都不一样,似乎没有规律。通过将日志文件与 MQ 中的记录列表进行比较,我们可以看到看似随机的记录子集正在被我们的工作“处理”。第一条记录可能会被拾取,然后跳过 50 条,然后连续 5 条,依此类推。每次作业运行时模式都不同。也没有记录异常。

在 localhost 中运行相同的应用程序并使用相同的数据集进行测试时,ItemProcessor 成功检索并记录了所有 10,000 多条记录。该作业在生产环境中运行 20 到 40 秒(也不是恒定的),但在测试和本地环境中需要几分钟才能完成(这显然是有道理的,因为它要处理更多的记录)。

所以这是难以解决的问题之一,因为我们无法重新创建它。一个想法是实现我们自己的 ItemReader 并添加额外的日志记录,以便我们可以查看记录是在阅读器之前还是在阅读器之后丢失 - 我们现在所知道的是只有一部分记录正在由 ItemProcessor 处理。但即使这样也不能解决我们的问题,考虑到它甚至不是解决方案,实施起来还是有点及时的。

有没有其他人看到过这样的问题?任何可能的想法或故障排除建议将不胜感激。以下是我们用作参考的一些 jar 版本号。

  • 春季 - 3.0.5.RELEASE
  • Spring 集成 - 2.0.3.RELEASE
  • 春季批次 - 2.1.7.RELEASE
  • 活动 MQ - 5.4.2
  • Websphere MQ - 7.0.1

提前感谢您的意见。

编辑:根据请求,处理器代码:

public SMSReminderRow process(Message message) throws Exception {

    SMSReminderRow retVal = new SMSReminderRow();
    LOGGER.debug("Converting JMS Message to ClaimNotification");
    ClaimNotification notification = createClaimNotificationFromMessage(message);

    retVal.setShortCode(BatchCommonUtils
            .parseShortCodeFromCorpEntCode(notification.getCorpEntCode()));
    retVal.setUuid(UUID.randomUUID().toString());
    retVal.setPhoneNumber(notification.getPhoneNumber());
    retVal.setMessageType(EventCode.SMS_CLAIMS_NOTIFY.toString());

    DCRContent content = tsContentHelper.getTSContent(Calendar
            .getInstance().getTime(),
            BatchCommonConstants.TS_TAG_CLAIMS_NOTIFY,
            BatchCommonConstants.TS_TAG_SMSTEXT_TYP);

    String claimsNotificationMessage = formatMessageToSend(content.getContent(),
            notification.getCorpEntCode());

    retVal.setMessageToSend(claimsNotificationMessage);
    retVal.setDateTimeToSend(TimeUtils
            .getGMTDateTimeStringForDate(new Date()));

    LOGGER.debug(
            "Finished processing claim notification for {}. Writing row to file.",
            notification.getPhoneNumber());
    return retVal;
}

JMS 配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<bean id="claimsQueueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName" value="jms/SMSClaimNotificationCF" />
    <property name="lookupOnStartup" value="true" />
    <property name="cache" value="true" />
    <property name="proxyInterface" value="javax.jms.ConnectionFactory" />
</bean>

<bean id="jmsDestinationResolver"
    class="org.springframework.jms.support.destination.DynamicDestinationResolver">
</bean>

<bean id="jmsJndiDestResolver" 
    class=" org.springframework.jms.support.destination.JndiDestinationResolver"/>  

<bean id="claimsJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="claimsQueueConnectionFactory" />
    <property name="defaultDestinationName" value="jms/SMSClaimNotificationQueue" />
    <property name="destinationResolver" ref="jmsJndiDestResolver" />
    <property name="pubSubDomain">
        <value>false</value>
    </property>
    <property name="receiveTimeout">
        <value>20000</value>
    </property>
</bean>

【问题讨论】:

  • 我认为在任何人可以帮助您之前,您需要一个可重复性最低的样本。除此之外,这一切都只是猜测:某处必须有异常或日志文件会提供更多信息。是否有线程在吐槽,或者您对正在被垃圾收集的对象有 WeakReferences?我可能会将 GC 与“跳过”进行比较,以查看对象是否在完成工作之前被收集(很难相信会是这种情况,但值得一看)。您是否在生产中遇到网络或其他超时?
  • 感谢@mttdbrd 的输入。我知道这是在黑暗中拍摄,但我们没有太多信息可以继续。我希望有人以前见过类似的行为,并能指出我正确的方向。我们最好的猜测是 Websphere、Spring Batch 和/或 MQ 之间存在某种兼容性问题。到目前为止,我们还没有在任何生产日志中发现任何类型的异常或错误,但我会针对垃圾收集问题的任何迹象进行一些额外的研究。生产中也没有超时迹象。
  • 能否展示一下您的 Spring Batch 配置和处理器
  • 我在原始问题中添加了处理器和 jms 配置,正如你问 Palcente 的那样。

标签: java spring activemq spring-batch ibm-mq


【解决方案1】:

作为一项规则,MQ 在正确配置时不会丢失消息。那么问题是“正确配置”是什么样的?

通常,丢失消息是由非持久性或非事务性 GET 引起的。

如果非持久性消息正在遍历 QMgr-to-QMgr 通道并且设置了NPMSPEED(FAST),则如果错误丢失,MQ 将不会记录错误。这就是这些选项的用途,因此不会出现错误。

修复:在 QMgr-to-QMgr 通道上设置 NPMSPEED(NORMAL) 或使消息持久化。

如果客户端在同步点之外获取消息,则消息可能会丢失。这与 MQ 无关,它只是一般消息传递的工作方式。如果您告诉 MQ 破坏性地从队列中获取消息并且它无法将该消息传递到远程应用程序,那么 MQ 回滚它的唯一方法是在同步点下检索该消息。

修复:使用事务处理的会话。

还有一些额外的注释,源于经验。

  • 每个人都发誓消息持久性设置为他们认为的那样。但是,当我停止应用程序并手动检查消息时,非常 通常 不是预期的。这很容易验证,所以不要假设。
  • 如果消息在队列中回滚,直到 MQ 或 TCP 使孤立通道超时才会发生。这可能长达 2 小时,因此请调整通道参数和 TCP Keepalive 以减少这种情况。
  • 检查 MQ 的错误日志(QMgr 而非客户端的错误日志)以查找有关事务回滚的消息。
  • 如果您仍然无法确定消息的去向,请尝试使用SupportPac MA0W 进行跟踪。此跟踪作为出口运行,它是非常可配置的。您可以跟踪单个队列上的所有 GET 操作,并且只能跟踪该队列。输出采用人类可读的形式。

【讨论】:

  • 我需要请我们的 MQ 人员审查这些建议 T.Rob。目前所有记录都从 MQ 传输到分布式 MQ,这是我们从中提取记录的地方。
【解决方案2】:

http://activemq.apache.org/jmstemplate-gotchas.html

使用 JMSTemplate 存在问题。我只是在升级硬件并突然暴露了预先存在的竞争条件时才遇到这些问题。

简而言之,根据设计和意图,JMS 模板会在每次调用时打开和关闭连接。它不会看到早于其创建的消息。在大容量和/或高吞吐量的情况下,它将无法读取某些消息。

【讨论】:

  • 我们团队的另一位开发人员通过反复试验发现了这一点 - 但这也正是他发现的问题所在。我将此标记为解决方案,因为您提供了一个有用的链接,其中包含一些其他详细信息。
猜你喜欢
  • 2015-09-15
  • 2017-08-07
  • 1970-01-01
  • 1970-01-01
  • 2018-10-24
  • 2020-09-24
  • 2021-07-10
  • 2016-12-15
  • 1970-01-01
相关资源
最近更新 更多