【发布时间】:2021-10-20 14:04:42
【问题描述】:
我有以下流程:SFTP 入站流适配器读取一个文件,该文件是一个 JSON。那么:
- 事务同步直接在该适配器上注册,以便在事务提交时重命名输入文件(因为我已经完成了)
- 读取的文件被转换为带有
json-to-object的 DTO,并被扔给一个转换器,该转换器将其转换为其他东西 - 这个“其他东西”应该放在另一个频道上,但只能在事务提交之后
这是流程的精髓:
<int-sftp:inbound-streaming-channel-adapter
session-factory="mySftpSessionFactory"
channel="channel1"
filename-pattern="*.JSON"
remote-directory-expression="'/mypath'"
max-fetch-size="10">
<int:poller max-messages-per-poll="1" fixed-delay="1"
time-unit="MINUTES" error-channel="unexpectedErrorChannel">
<int:transactional
synchronization-factory="synchFactory1"
transaction-manager="transactionManager" />
</int:poller>
</int-sftp:inbound-streaming-channel-adapter>
<int:transaction-synchronization-factory id="synchFactory1">
<int:after-commit channel="onCommitRemoteFileRenameChannel" />
</int:transaction-synchronization-factory>
<int:channel id="channel1" />
<int:chain input-channel="channel1" output-channel="nullChannel">
<int:stream-transformer charset="UTF-8" />
<int:json-to-object-transformer type="com.example.MyDto" />
<int:transformer ref="myTransformer">
<int:transactional synchronization-factory="synchFactory2"
transaction-manager="transactionManager" />
</int:transformer>
</int:chain>
<int:transaction-synchronization-factory id="synchFactory2">
<int:after-commit channel="onCommitSecondFlowChannel" />
</int:transaction-synchronization-factory>
<int:channel id="onCommitRemoteFileRenameChannel" />
<int-sftp:outbound-gateway
session-factory="mySftpSessionFactory"
request-channel="onCommitRemoteFileRenameChannel"
command="mv"
expression="headers[T(org.springframework.integration.file.FileHeaders).REMOTE_DIRECTORY].concat('/'.concat(headers[T(org.springframework.integration.file.FileHeaders).REMOTE_FILE]))"
rename-expression="headers[T(org.springframework.integration.file.FileHeaders).REMOTE_DIRECTORY].concat('/'.concat(headers[T(org.springframework.integration.file.FileHeaders).REMOTE_FILE].concat('.done')))"
requires-reply="false"
reply-channel="nullChannel" />
在转换器内部声明的第二个<int:transactional> 并不意味着实际打开一个新事务(默认传播应该是必需的,因此它应该加入轮询器打开的主事务),而只是添加第二个同步这会将转换器返回的消息触发到名为onCommitSecondFlowChannel 的通道以进行进一步处理。
但是,这似乎无法正常工作:虽然正确执行了远程文件重命名,但没有应用第二次同步,因为我没有从转换器获得任何消息到onCommitSecondFlowChannel。
我尝试调试,发现 <int:transacationl-synchronization-factory>s 都已解析,但第二个方法 ExpressionEvaluatingTransactionSynchronizationProcessor.processAfterCommit(IntegrationResourceHolder) 从未执行。
有没有一种方法可以获得此结果,而无需在转换器之后声明服务激活器和网关以将消息放置在第二个流通道上?
【问题讨论】:
标签: transactions spring-integration