【问题标题】:Azure Service Bus : Amqp Idle Timeout condition = amqp:link:detach-forcedAzure 服务总线:Amqp 空闲超时条件 = amqp:link:detach-forced
【发布时间】:2019-12-09 11:46:23
【问题描述】:

我得到的错误:

2019-12-09 06:39:33.189 ERROR 107132 --- [http-nio-8082-exec-5] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.jms.IllegalStateException: The MessageProducer was closed due to an unrecoverable error.; nested exception is javax.jms.IllegalStateException: The MessageProducer was closed due to an unrecoverable error.] with root cause

javax.jms.JMSException: Idle link tracker, link qpid-jms:sender:ID:7300953e-f587-4ae3-b9fe-85b84e032554:1:101:1:order-update has been idle for 1800000ms TrackingId:801ab247-3f36-4470-8665-08846eb1c181_G24, SystemTracker:client-link34404815, Timestamp:2019-12-06T21:04:35 [condition = amqp:link:detach-forced]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:164)
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:117)
    at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.processRemoteClose(AmqpAbstractResource.java:262)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:906)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1800(AmqpProvider.java:102)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:792)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

依赖项

compile group: 'com.microsoft.azure', name: 'azure-servicebus-spring-boot-starter', version: '0.2.0'
compile group: 'javax.jms', name: 'javax.jms-api', version: '2.0.1'
compile group: 'org.apache.qpid', name: 'qpid-jms-client', version: '0.28.0'
compile group: 'org.apache.camel', name: 'camel-jms', version: '2.24.1'
compile group: 'org.springframework.integration', name: 'spring-integration-jms', version: '5.0.4.RELEASE'

jmsConnectionFactory 配置:

<bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory">
        <bean class="org.apache.qpid.jms.JmsConnectionFactory">
            <constructor-arg value="${azure.jms.url}" />
            <property name="username" value="${azure.jms.username}" />
            <property name="password" value="${azure.jms.password}" />
            <property name="clientID" value="AltaPay" />
            <property name="receiveLocalOnly" value="true" />
            <property name="localMessageExpiry" value="true" />
            <property name="populateJMSXUserID" value="true" />
        </bean>
    </property>
    <property name="exceptionListener">
        <bean class="com.lauraashley.microservice.altapay.callback.exception.CustomJMSExceptionListener" />
    </property>
    <property name="sessionCacheSize" value="10" />
    <property name="cacheConsumers" value="false" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="cacheLevelName" value="CACHE_NONE" />
</bean>

CustomJMSExceptionListener

public class CustomJMSExceptionListener implements ExceptionListener {
    
    private static final Logger logger = getLogger(CustomJMSExceptionListener.class);

    @Override
    public void onException(JMSException exception) {
        // TODO Auto-generated method stub
        logger.error("--------------- Catched exception with CustomJMSExceptionListener ---------------");
        logger.error("Error code:"+exception.getErrorCode());
        logger.error("Msg:"+exception.getMessage());
        exception.printStackTrace();
        logger.error("---------------------------------------------------------------------------------");
    }
}

我如何复制它

第一:没有使用CustomJMSExceptionListener,是不是配置好?

该应用程序是OCC(oracle cloud commerce)平台上的电子商务应用程序,使用java spring-boot服务进行支付集成和流程。

当订单超过空闲时间然后与 Azure 服务总线的连接失败并且为了重新连接我必须重新启动 java 应用程序时会发生此错误,这是一个很大的问题,因为无法处理更多订单。我读到 CachingConnectionFactory 有 reconnectOnException,默认情况下是 true。

我真的不明白为什么会发生这种情况以及修复它的解决方案是什么。

【问题讨论】:

    标签: jms amqp spring-jms azure-servicebus-queues qpid


    【解决方案1】:

    异常表明 Azure 已关闭生产者,因为它空闲了很长时间,这意味着它没有在超时内发送消息(一些文档 here)。在使用 CachingConnectionFactory 时,您可以通过将 cache producers option 配置为 false 来解决此问题,以便按需创建生产者,但我不完全确定这一点,因为我没有任何方法来测试它。

    这不是 Qpid JMS 客户端级别的错误,而是 Azure 的行为,在我认为链接上没有活动十分钟后它会强制关闭链接。在非基于弹簧的应用程序中,您必须通过在发送时捕获 JMSException 来解决此问题,尝试创建新的生产者并再次发送,或者断开整个连接并重新开始。您的反应在一定程度上取决于您是否预知您正在使用 Azure 并知道这可能发生。

    【讨论】:

    • 我知道 Azure 会在 10 分钟内关闭连接,我一直在寻找解决方案。另请阅读 Azure 不接受心跳消息。
    • 您可以尝试使用故障转移来掩盖其中的一些问题,但不幸的是,您只需要处理这样一个事实,即 Azure 会杀死空闲资源并保持它们处于活动状态或使用非缓存资源,以便它们每次都重新创建。
    • @TimBish 谢谢!在 CachingConnectionFactory 上将缓存生产者选项设置为 false 对我有用。
    【解决方案2】:

    如上所述,这是 Azure 服务总线的预期行为,此问题在 azure-spring-boot 中公开。到目前为止,解决方法是将CachingConnectionFactory.cacheProducers 值设置为False,因此将为每个会话创建新的生产者。

    CachingConnectionFactory connectionFactory = (CachingConnectionFactory) jmsTemplate.getConnectionFactory();
    
    connectionFactory.setCacheProducers(false);
    

    另一种可能的方式,

    @Bean
        public ConnectionFactory jmsConnectionFactory(AzureServiceBusJMSProperties busJMSProperties){
            final String connectionString = busJMSProperties.getConnectionString();
            final String clientId = busJMSProperties.getTopicClientId();
            final int idleTimeout = busJMSProperties.getIdleTimeout();
    
            final ServiceBusKey serviceBusKey = ConnectionStringResolver.getServiceBusKey(connectionString);
    
            final String remoteUri = String.format("amqps://%s?amqp.idleTimeout=%d&amqp.traceFrames=true",
                    serviceBusKey.getHost(), idleTimeout);
    
            final JmsConnectionFactory jmsConnectionFactory =
                    new JmsConnectionFactory(
                            serviceBusKey.getSharedAccessKeyName(),
                            serviceBusKey.getSharedAccessKey(),
                            remoteUri
                    );
            jmsConnectionFactory.setClientID(clientId);
    
            CachingConnectionFactory cachingConnectionFactory =
                    new CachingConnectionFactory(jmsConnectionFactory);
                    // set cache producers to FALSE here
            cachingConnectionFactory.setCacheProducers(false);
    
            return cachingConnectionFactory;
        }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-02-25
      • 1970-01-01
      • 2015-08-11
      • 2017-08-27
      • 2022-06-14
      • 1970-01-01
      • 1970-01-01
      • 2021-02-02
      相关资源
      最近更新 更多