【问题标题】:Mule ESB 3.3 How to start transactional flows concurrentlyMule ESB 3.3 如何同时启动事务流
【发布时间】:2013-09-23 17:44:40
【问题描述】:

我想同时处理一组消息,但我无法使它们成为事务性的,除非我将 VM 设置为请求-响应...在这种情况下,处理不是并发的。

Mule documentation 声明“Mule 事务配置在同步端点上”,但我不太明白这个限制。
很明显,在您希望成为事务性的流中,不应产生异步流,但(对我而言)不清楚为什么不能(从非 tx 主流)启动任意数量的异步流事务性的。

换句话说,为什么它可以正常工作:

但如果我将虚拟机更改为“单向”,则会失败:

org.mule.transaction.IllegalTransactionStateException: Can only bind "javax.sql.DataSource/java.sql.Connection" type resources

有没有办法解决这个问题?

流的 XML:

<?xml version="1.0" encoding="UTF-8"?>
<mule>
    <spring:beans>
        <spring:bean id="dataSource" name="dataSource" class="org.enhydra.jdbc.standard.StandardDataSource" destroy-method="shutdown">
            <spring:property name="driverName" value="org.h2.Driver" />
            <spring:property name="url" value="jdbc:h2:tcp://localhost/~/mule" />
            <spring:property name="user" value="sa" />
            <spring:property name="password">
                <spring:value></spring:value>
            </spring:property>
        </spring:bean>

        <spring:bean id="transactionFactory" name="transactionFactory" class="org.mule.transport.jdbc.JdbcTransactionFactory" />

    </spring:beans>

    <jdbc:connector name="dbConnector" dataSource-ref="dataSource" validateConnections="true" queryTimeout="-1" pollingFrequency="0" doc:name="Database" />

    <flow name="triggerFlow" doc:name="triggerFlow">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" doc:name="HTTP" />
        <set-payload value="#[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]" doc:name="[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]"/>
        <collection-splitter doc:name="Collection Splitter"/>
        <vm:outbound-endpoint exchange-pattern="request-response" path="flow" doc:name="flow" />
    </flow>

    <flow name="flow" doc:name="flow">
        <vm:inbound-endpoint exchange-pattern="request-response" path="flow" doc:name="flow">
            <custom-transaction factory-ref="transactionFactory" action="ALWAYS_BEGIN" timeout="10" />
        </vm:inbound-endpoint>
        <logger message="#[payload]" level="INFO" doc:name="Logger"/>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert" queryTimeout="-1" connector-ref="dbConnector" doc:name="Insert 1">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert" value="insert into test values (#[payload], 'Test 1')" />
        </jdbc:outbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert2" queryTimeout="-1" connector-ref="dbConnector" doc:name="Insert 2">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert2" value="insert into test values (#[payload + 10], 'Test 2')" />
        </jdbc:outbound-endpoint>
    </flow>

</mule>

提前致谢。

【问题讨论】:

    标签: asynchronous transactions esb mule


    【解决方案1】:

    Mule 文档指出“Mule 事务是在同步端点上配置的”,但我不太了解这个限制。

    这个限制是因为在 Mule 中和 Spring 中一样,更普遍地在 Java 中,事务是线程绑定的。使用异步流,涉及多个线程,因此无法维护事务-线程关联。

    所以不,您不能拆分/分叉/并行化/异步处理消息,并且在 Mule 中也有事务。

    org.mule.transaction.IllegalTransactionStateException: 只能绑定“javax.sql.DataSource/java.sql.Connection”类型的资源

    但这与第一个问题 IMO 无关:这是因为您通过custom-transaction 强制尝试在 JDBC 事务中注册 VM 端点。这是行不通的。如果要注册异构资源,请使用 XA 事务。

    编辑:根据您在 cmets 中所说,您不想在事务中注册 VM 端点,因此只需在此处注册 JDBC 端点:

    <transactional action="ALWAYS_BEGIN">
        <jdbc:outbound-endpoint exchange-pattern="request-response"
            queryKey="insert" queryTimeout="-1" connector-ref="dbConnector"
            doc:name="insert into test values (1, 'Test 1')">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert"
                value="insert into test values (1, 'Test 1')" />
        </jdbc:outbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="request-response"
            queryKey="insert2" queryTimeout="-1" connector-ref="dbConnector"
            doc:name="insert into test values (2, 'Test 2')">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert2"
                value="insert into test values (2, 'Test 2')" />
        </jdbc:outbound-endpoint>
    </transactional>
    

    这适用于 one-way 入站端点。

    【讨论】:

    • 抱歉,我不太明白。我希望每个流(线程)本身都是事务性的,我不想将它们全部包含在同一个事务中。在 Java 中,完全可以生成任意数量的线程,每个线程都在自己的 tx 下。例如。应用服务器可以毫无问题地处理多个客户端请求。
    • 这正是我要说的:一个线程,一个事务。但是如果端点是request-response,Mule 只保证一个线程。
    • 但是您还没有尝试过transactional 元素。我已经查看了我的答案:它对我有用。
    • 哇,你从哪里得到的 标签?它似乎有效,但是: 1. 它在日志中打印 SQLException 两次(因为它正在重试......或者天知道) 2. 无论你放在标签中的任何内容在 Mule Studio 中都不再可见。我的版本似乎表现得更好。
    • 我从 Eclipse XML 编辑器代码辅助中得到它:D 我没有使用 Mule Studio。 1.奇怪。 2. 现在你为什么我使用普通的 Eclipse :D
    【解决方案2】:

    我设法让它工作。我必须添加一个中间异步流来调用同步/发送流:

    我认为这是丑陋和不必要的,将它作为原始帖子调用是完全可以的,但出于我无法理解的原因,Mule 会让你为此跳过箍。

    以下是流的 XML:

    <?xml version="1.0" encoding="UTF-8"?>
        <mule>
        <spring:beans>
            <spring:bean id="dataSource" name="dataSource" class="org.enhydra.jdbc.standard.StandardDataSource" destroy-method="shutdown">
                <spring:property name="driverName" value="org.h2.Driver" />
                <spring:property name="url" value="jdbc:h2:tcp://localhost/~/mule" />
                <spring:property name="user" value="sa" />
                <spring:property name="password">
                    <spring:value></spring:value>
                </spring:property>
            </spring:bean>
    
            <spring:bean id="transactionFactory" name="transactionFactory" class="org.mule.transport.jdbc.JdbcTransactionFactory" />
    
        </spring:beans>
    
        <jdbc:connector name="dbConnector" dataSource-ref="dataSource" validateConnections="true" queryTimeout="-1" pollingFrequency="0" doc:name="Database" />
    
        <flow name="triggerFlow" doc:name="triggerFlow">
            <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" doc:name="HTTP" />
            <set-payload value="#[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]" doc:name="[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]"/>
            <collection-splitter doc:name="Collection Splitter"/>
            <vm:outbound-endpoint exchange-pattern="one-way" path="async" doc:name="async" />
        </flow>
        <flow name="simpletransactionFlow1" doc:name="simpletransactionFlow1">
            <vm:inbound-endpoint exchange-pattern="one-way" path="async" doc:name="async"/>
            <vm:outbound-endpoint exchange-pattern="request-response" path="flow" doc:name="flow"/>
        </flow>
    
        <flow name="flow" doc:name="flow">
            <vm:inbound-endpoint exchange-pattern="request-response" path="flow" doc:name="flow">
                <custom-transaction factory-ref="transactionFactory" action="ALWAYS_BEGIN" timeout="10" />
            </vm:inbound-endpoint>
            <logger message="#[groovy:Thread.currentThread().getName()], payload=#[payload]" level="INFO" doc:name="Logger"/>
            <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert" queryTimeout="-1" connector-ref="dbConnector" doc:name="Insert 1">
                <jdbc:transaction action="ALWAYS_JOIN" />
                <jdbc:query key="insert" value="insert into test values (#[payload], 'Test 1')" />
            </jdbc:outbound-endpoint>
            <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert2" queryTimeout="-1" connector-ref="dbConnector" doc:name="Insert 2">
                <jdbc:transaction action="ALWAYS_JOIN" />
                <jdbc:query key="insert2" value="insert into test values (#[payload + 10], 'Test 2')" />
            </jdbc:outbound-endpoint>
        </flow>
    
    </mule>
    

    【讨论】:

    • 我保证我的答案是正确的 :) 如果您同意,请检查并修改接受!
    • 为了让 VM 参与事务,您不应该使用 XA 事务管理器吗?
    【解决方案3】:

    这在Mule documentation 中进行了讨论。它可能需要稍微麻烦一些,但还不错,实际上只需要一个 VM 传输,而不是两个:

    我更改它是因为我想使用 TCP 而不是 HTTP,但它基本上是相同的示例。

    第一个流接收未处理的 TCP,用它做一些事情(这里只是一个字节数组到字符串,但可能你有验证或其他东西),将输入推送到 VM 连接器,然后将输入回显到 TCP溪流。在 VM 连接器保证它可以释放其消息的 TCP 源之后返回,如果它正在执行一些低级别的保证传递。

    VM 连接器是单向的,并且有一个与之连接的持久存储,因此它不会丢失消息。这在 Mule Studio 3.5 中显示了一个错误,但它工作正常。

    然后中间流接管,并包含一个事务,因此除非业务逻辑子流成功完成,否则 VM 连接器不会释放消息。

    子流程终于运行了;目前只是 Thread.sleep()ing 5 秒钟,所以我可以看到它一切正常。 Telnet 进入会立即回显到 Telnet 控制台,然后在五秒后再次回显。

    希望对您有所帮助!我只花了几个小时和 Mule 在一起,但这似乎是正确的。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2013-01-07
      • 1970-01-01
      • 2017-01-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多