【问题标题】:Mule Persistent ActiveMQ RedeliveryPolicyMule Persistent ActiveMQ RedeliveryPolicy
【发布时间】:2017-09-23 10:04:12
【问题描述】:

我使用 Mule 作为 ESB 解决方案。我有一个队列,我从中获取消息并尝试向一直失败的服务发出 http 请求。

我在 ActiveMQ 上配置了 RedeliveryPolicy,如下所示:

    <spring:bean id="retryRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"
        name="retryRedeliveryPolicy">
        <spring:property name="maximumRedeliveries" value="76" />
        <spring:property name="initialRedeliveryDelay" value="300000" />
        <spring:property name="maximumRedeliveryDelay" value="3600000" />
        <spring:property name="useExponentialBackOff" value="true" />
        <spring:property name="backOffMultiplier" value="2" />
        <spring:property name="queue" value="*" />
    </spring:bean>

它会在 5 分钟后重试。然后 10 分钟 ,20,40,60,60,60... 大约 3 天

问题在于,重试逻辑是非持久性的。

假设,消息重试了 2 天。而且我已经部署了新版本的mule应用程序,或者重新启动了服务器......在这种情况下,重试逻辑将从5分钟,10分钟重新开始......因为重试状态由客户端保存在RAM内存中。

让 RedeliveryPolicy 持久化很热门?在我将在 2 天后重新启动服务器后,它必须继续重试 1 天。

我认为一个解决方案可能是将 timeToLive 设置为 72 小时的消息。但即使在重新启动服务器之后。它不会从一开始就每隔一小时重试一次。它将从 5 分钟开始...

【问题讨论】:

    标签: java mule activemq retry-logic


    【解决方案1】:

    ActiveMQ 有一种方法可以进行持久的重新交付,但它不是内置在 ConnectionFactory 上使用 RedeliveryPolicy 的,它旨在用于在失败前进行短时间的重新交付。

    但是,可以使用 ActiveMQ 调度程序和延迟消息构建持久重新传递。有点手动,但可行。在尝试此操作之前,请确保在 ActiveMQ 中打开 schedulerSupport="true"。

    JMS 属性:“delivery”跟踪重试次数,如果出现问题,catch 异常策略会延迟多次重新传递消息。实际的延迟由 ActiveMQ 处理,因此该流程可以处理数小时、数天等的延迟,并且可以承受 ActiveMQ 和 Mule 的重启。

    <?xml version="1.0" encoding="UTF-8"?>
    
    <mule xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
        xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"
        xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
        xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
    http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
    http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
    http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd">
    
        <spring:beans>
            <spring:bean name="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
                <spring:property name="maximumRedeliveries" value="0" />
            </spring:bean>
    
            <spring:bean name="cf"
                class="org.apache.activemq.ActiveMQConnectionFactory">
                <spring:property name="brokerURL" value="tcp://localhost:61616" />
                <spring:property name="redeliveryPolicy" ref="redeliveryPolicy" />
            </spring:bean>
        </spring:beans>
    
        <jms:activemq-connector name="Active_MQ"
            specification="1.1" brokerURL="tcp://localhost:61616"
            validateConnections="true" doc:name="Active MQ" numberOfConsumers="1"
            maxRedelivery="-1" connectionFactory-ref="cf" />
    
        <flow name="testFlow">
            <jms:inbound-endpoint queue="IN" connector-ref="Active_MQ"
                doc:name="JMS">
                <jms:transaction action="ALWAYS_BEGIN" />
            </jms:inbound-endpoint>
    
            <set-property propertyName="delivery"
                value="#[message.inboundProperties['delivery'] or 0]" doc:name="Property" />
    
            <scripting:component doc:name="Throw Error">
                <scripting:script engine="Groovy"><![CDATA[throw new java.lang.RuntimeException("Error in processing")]]></scripting:script>
            </scripting:component>
    
            <choice-exception-strategy doc:name="Choice Exception Strategy">
                <catch-exception-strategy doc:name="Catch Exception Strategy"
                    when="#[message.outboundProperties['delivery'] &lt; 5]">
                    <logger level="ERROR"
                        message="Retry once more count #[message.outboundProperties['delivery']]"
                        doc:name="Logger" />
    
                    <set-property propertyName="AMQ_SCHEDULED_DELAY" value="3000"
                        doc:name="Property" />
                    <set-property propertyName="delivery"
                        value="#[message.outboundProperties['delivery'] + 1]" doc:name="Property" />
                    <jms:outbound-endpoint queue="IN"
                        connector-ref="Active_MQ" doc:name="JMS">
                        <jms:transaction action="ALWAYS_JOIN" />
                    </jms:outbound-endpoint>
    
                </catch-exception-strategy>
    
                <rollback-exception-strategy doc:name="Rollback Exception Strategy">
                    <logger level="ERROR" message="Giving up retry. Do whatever needed here." doc:name="Logger" />
                </rollback-exception-strategy>
            </choice-exception-strategy>
        </flow>
    </mule>
    

    【讨论】:

      【解决方案2】:

      这无法使 RedeliveryPolicy 持久化 - 它由连接工厂控制,并且在您重新启动服务器时工厂将被重置。按照您的配置方式,它将继续您看到的行为,因为您将 useExponentialBackOff 设置为 true。您可以将此设置为 false 以使延迟成为常规量,但这是唯一需要进行的更改。

      我认为您设置消息 TTL 的想法是正确的,至少这会在指定时间后删除消息。 JMSXDeliveryCount 很棒,但如果您增加重试之间的延迟,它将在多次尝试后移除,而不是定义的时间段。

      【讨论】:

        【解决方案3】:

        您可以使用 JMS 消息的 JMSXDeliveryCount 属性来检查它已经重试了多少次。重试时间间隔逻辑应该依赖于 JMSXDeliveryCount 变量。

        message.getIntProperty("JMSXDeliveryCount ")

        http://activemq.apache.org/activemq-message-properties.html

        【讨论】:

          猜你喜欢
          • 2019-03-27
          • 2021-08-09
          • 1970-01-01
          • 1970-01-01
          • 2018-01-04
          • 1970-01-01
          • 2018-03-17
          • 1970-01-01
          • 2015-02-11
          相关资源
          最近更新 更多