【问题标题】:Multiple transaction synchronization factories in the same flow同一流中的多个事务同步工厂
【发布时间】:2021-10-20 14:04:42
【问题描述】:

我有以下流程:SFTP 入站流适配器读取一个文件,该文件是一个 JSON。那么:

  • 事务同步直接在该适配器上注册,以便在事务提交时重命名输入文件(因为我已经完成了)
  • 读取的文件被转换为带有json-to-object 的 DTO,并被扔给一个转换器,该转换器将其转换为其他东西
  • 这个“其他东西”应该放在另一个频道上,但只能在事务提交之后

这是流程的精髓:

<int-sftp:inbound-streaming-channel-adapter
  session-factory="mySftpSessionFactory"
  channel="channel1"
  filename-pattern="*.JSON"
  remote-directory-expression="'/mypath'"
  max-fetch-size="10">
  <int:poller max-messages-per-poll="1" fixed-delay="1"
    time-unit="MINUTES" error-channel="unexpectedErrorChannel">
    <int:transactional
      synchronization-factory="synchFactory1"
      transaction-manager="transactionManager" />
    </int:poller>
</int-sftp:inbound-streaming-channel-adapter>

<int:transaction-synchronization-factory id="synchFactory1">
  <int:after-commit channel="onCommitRemoteFileRenameChannel" />
</int:transaction-synchronization-factory>

<int:channel id="channel1" />

<int:chain input-channel="channel1" output-channel="nullChannel">
  <int:stream-transformer charset="UTF-8" />
  <int:json-to-object-transformer type="com.example.MyDto" />
  <int:transformer ref="myTransformer">
    <int:transactional synchronization-factory="synchFactory2"
      transaction-manager="transactionManager" />
  </int:transformer>
</int:chain>

<int:transaction-synchronization-factory id="synchFactory2">
  <int:after-commit channel="onCommitSecondFlowChannel" />
</int:transaction-synchronization-factory>

<int:channel id="onCommitRemoteFileRenameChannel" />
<int-sftp:outbound-gateway
  session-factory="mySftpSessionFactory"
  request-channel="onCommitRemoteFileRenameChannel" 
  command="mv"
  expression="headers[T(org.springframework.integration.file.FileHeaders).REMOTE_DIRECTORY].concat('/'.concat(headers[T(org.springframework.integration.file.FileHeaders).REMOTE_FILE]))"
  rename-expression="headers[T(org.springframework.integration.file.FileHeaders).REMOTE_DIRECTORY].concat('/'.concat(headers[T(org.springframework.integration.file.FileHeaders).REMOTE_FILE].concat('.done')))"
  requires-reply="false"
  reply-channel="nullChannel" />

在转换器内部声明的第二个&lt;int:transactional&gt; 并不意味着实际打开一个新事务(默认传播应该是必需的,因此它应该加入轮询器打开的主事务),而只是添加第二个同步这会将转换器返回的消息触发到名为onCommitSecondFlowChannel 的通道以进行进一步处理。

但是,这似乎无法正常工作:虽然正确执行了远程文件重命名,但没有应用第二次同步,因为我没有从转换器获得任何消息到onCommitSecondFlowChannel。 我尝试调试,发现 &lt;int:transacationl-synchronization-factory&gt;s 都已解析,但第二个方法 ExpressionEvaluatingTransactionSynchronizationProcessor.processAfterCommit(IntegrationResourceHolder) 从未执行。

有没有一种方法可以获得此结果,而无需在转换器之后声明服务激活器和网关以将消息放置在第二个流通道上?

【问题讨论】:

    标签: transactions spring-integration


    【解决方案1】:

    我认为您使用额外的 tx-sync 使您的流程过于复杂。 它真的不是为了在流程中间做某事而设计的。 从我们开始事务的角度来看,它是一个全局钩子。由于您不在该变压器中,因此无法应用。

    我们可能会在将来对其进行修改,因为我看到了您的感受,如果此时无法启动新的 TX,那么这样的 tx-sync 应该加入现有的 TX 是非常合乎逻辑的。

    无论如何我都不会这样做,因为它会使流程不直观。

    我看到你在&lt;transformer&gt; 之后有这个&lt;int:chain input-channel="channel1" output-channel="nullChannel"&gt; - nullChannel 作为输出。

    那么,如何完全删除 &lt;transactional&gt; 并将 onCommitSecondFlowChannel 作为 &lt;chain&gt; 的输出?无论如何,这似乎是您流程的结束,因此无需将某些事情推迟到交易结束。

    【讨论】:

    • 感谢 Artem 的洞察。嗯,流的第一个版本确实是你描述的那个,但是下游流涉及多个单独的事务:如果一个这样的 tx 回滚,将onCommitSecondFlowChannel 放在链输出上会带来“tx 设置为仅回滚”,因为poller (outer) tx 是联合的。所以我尝试了一个调度程序来打破轮询器 tx 边界,但是我有乐观锁定异常,因为两个流都将相同的实体更改为彼此太近。因此,序列化流程的想法(如果第二个仅在第一个提交时执行,那很好)
    • 然后我将在转换器之后添加一个服务激活器,它在事务提交后调用第二个流的网关:这是一些我希望避免的代码,但它似乎是必要的。
    猜你喜欢
    • 1970-01-01
    • 2016-04-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-02-09
    相关资源
    最近更新 更多