【问题标题】:Camel/ActiveMQ Transactions, Redelivery and DLQsCamel/ActiveMQ 事务、重新交付和 DLQ
【发布时间】:2014-06-19 09:19:31
【问题描述】:

使用 Fabric8 379 构建。

目前正在努力使 ActiveMQ 和 Camel 获得所需的 TransactionErrorHandler 行为以按预期工作。

首先根据 Camel 错误处理程序文档 (http://camel.apache.org/error-handler.html),如果我按照建议调用 TransactionErrorHandler,即

<errorHandler id="txEH" type="TransactionErrorHandler">
<redeliveryPolicy logStackTrace="false" logExhausted="false" maximumRedeliveries="3"/>
</errorHandler>

我得到一个错误:

Caused by: org.xml.sax.SAXParseException: cvc-enumeration-valid: Value 'TransactionErrorHandler' is not facet-valid with respect to enumeration '[DeadLetterChannel, DefaultErrorHandler, NoErrorHandler, LoggingErrorHandler]'. It must be a value from the enumeration.

这很公平,我猜 TransactionErrorHandler 已从架构中删除并且必须以不同方式调用?因此,如果我采用替代路线并像这样指定 TransactionErrorHandler bean:

<bean id="transactionErrorHandler"
    class="org.apache.camel.spring.spi.TransactionErrorHandlerBuilder">
    <property name="deadLetterUri" value="activemq:queue:ActiveMQ.DLQ" />
    <property name="redeliveryPolicy" ref="redeliveryPolicy" /> 
</bean>

<bean id="redeliveryPolicy" class="org.apache.camel.processor.RedeliveryPolicy">
    <property name="backOffMultiplier" value="2" />
    <property name="maximumRedeliveries" value="2" />
    <property name="redeliveryDelay" value="1000" />
    <property name="useExponentialBackOff" value="true" />
</bean> 

我可以通过指定 errorHandlerRef="transactionErrorHandler" 在我的路由中成功使用它。然而,在测试这个场景时,redeliveryPolicy 被完全忽略,重新发送尝试为 6(默认)而不是上面指定的 2。我希望有人可以为我指明正确的方向,即如何在路由中正确指定 TransactionErrorHandler。下面是我当前的测试 blueprint.xml,它被部署到一个结构上:

<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0"
    xmlns:camel="http://camel.apache.org/schema/blueprint"
    xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
                              http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd
                              http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd 
                              http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0 http://aries.apache.org/schemas/blueprint-cm/blueprint-cm-1.0.0.xsd">

    <!-- blueprint property placeholders -->
    <cm:property-placeholder id="test-adapter" persistent-id="uk.test.transactions">
        <cm:default-properties>
            <cm:property name="amqBrokerURL" value="discovery:(fabric:platform)" />
            <cm:property name="amqBrokerUserName" value="admin" />
            <cm:property name="amqBrokerPassword" value="admin" />
        </cm:default-properties>
    </cm:property-placeholder>

    <camelContext xmlns="http://camel.apache.org/schema/blueprint" id="TestRouteContext" useMDCLogging="true">

        <!-- <errorHandler id="txEH" type="TransactionErrorHandler">
            <redeliveryPolicy logStackTrace="false"
                logExhausted="false" />
        </errorHandler> -->

        <route id="platform-test-route" errorHandlerRef="txEH">
            <from uri="activemq:queue:test-queue-in" />
            <transacted ref="transactionPolicy" />
                    <!-- Basic Bean that logs a message -->
            <bean ref="stubSuccess" />
                    <!-- Basic Bean that throws a java.lang.Exception-->
            <bean ref="stubFailure" />
            <to uri="activemq:queue:test-queue-out" />
        </route>

    </camelContext>

    <bean id="stubSuccess" class="uk.test.transactions.stubs.StubSuccess" />

    <bean id="stubFailure" class="uk.test.transactions.stubs.StubFailure" />

    <bean id="transactionErrorHandler"
        class="org.apache.camel.spring.spi.TransactionErrorHandlerBuilder">
        <property name="deadLetterUri" value="activemq:queue:ActiveMQ.DLQ" />
        <property name="redeliveryPolicy" ref="redeliveryPolicy" /> 
    </bean>

    <bean id="transactionPolicy" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
        <property name="transactionManager" ref="jmsTransactionManager" />
        <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
    </bean>

    <bean id="jmsTransactionManager"
        class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
    </bean>

    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
        <property name="transacted" value="true" />
        <property name="transactionManager" ref="jmsTransactionManager" />
        <property name="cacheLevelName" value="CACHE_CONSUMER" />
    </bean>

    <bean id="jmsPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        init-method="start" destroy-method="stop">
        <property name="maxConnections" value="1" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

<!--    <bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <property name="backOffMultiplier" value="2" />
        <property name="initialRedeliveryDelay" value="2000" />
        <property name="maximumRedeliveries" value="2" />
        <property name="redeliveryDelay" value="1000" />
        <property name="useExponentialBackOff" value="true" />
    </bean> -->

    <bean id="redeliveryPolicy" class="org.apache.camel.processor.RedeliveryPolicy">
        <property name="backOffMultiplier" value="2" />
        <property name="maximumRedeliveries" value="2" />
        <property name="redeliveryDelay" value="1000" />
        <property name="useExponentialBackOff" value="true" />
    </bean> 

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${amqBrokerURL}" />
        <property name="userName" value="${amqBrokerUserName}" />
        <property name="password" value="${amqBrokerPassword}" />
        <property name="watchTopicAdvisories" value="false" />
        <!-- <property name="redeliveryPolicy" ref="redeliveryPolicy" /> -->
    </bean>

</blueprint>  

如果有人能看出我哪里出错了,将不胜感激。

【问题讨论】:

  • txEH 错误处理程序被注释掉了,但是你的路由仍然引用它;也许是剪切粘贴问题?
  • 你只是一个剪切粘贴错字。

标签: java apache-camel activemq fabric8


【解决方案1】:

您应该在 AMQ 代理上配置重新交付选项,就像您使用 TX 时一样,它是负责执行重新交付的代理(不是 Camel)。

【讨论】:

  • 是的,克劳斯干杯,这对我有用,尽管 TransactionErrorHandler 已被弃用?我想将失败的事务持久化到各个 DLQ,而不必在我的 activeMQ 代理 xml 中指定 individualDeadLetterStrategy。在 TransactionErrorHandler 中指定 deadLetterUri 并将其应用于单个路由或骆驼上下文会容易得多(imo)。
  • 它没有被弃用,它只是不是 osgi 蓝图的一部分,因为它目前需要 spring tx api。
猜你喜欢
  • 2014-12-15
  • 1970-01-01
  • 2016-02-21
  • 2014-11-05
  • 2013-09-19
  • 1970-01-01
  • 2021-05-30
  • 2021-01-31
  • 1970-01-01
相关资源
最近更新 更多