【发布时间】:2020-05-15 15:00:02
【问题描述】:
我的项目使用spring-data-mongodb,一切都是被动的。有一个带有使用声明性事务的事务方法的 bean。相关代码片段如下:
@Configuration
public class Config {
@Bean
public ReactiveMongoTransactionManager reactiveMongoTransactionManager() {
return new ReactiveMongoTransactionManager(reactiveMongoDbFactory());
}
...
}
@Service
public class MyService {
private final ReactiveMongoOperations mongoOperations;
...
@Transactional
public Mono<User> saveUser(User user) {
return mongoOperations.insert(user).then(anotherInsertOnMongoOperations()).thenReturn(user);
}
}
这里没有什么不寻常的。
我可以在日志中看到事务在插入文档之前开始,然后提交:
DEBUG o.s.d.m.ReactiveMongoTransactionManager - About to start transaction for session [ClientSessionImpl@62de8058 id = {"id": {"$binary": "fye2h5JkRh6yL3MTqtC0Xw==", "$type": "04"}}, causallyConsistent = true, txActive = false, txNumber = 1, error = d != java.lang.Boolean].
DEBUG o.s.d.m.ReactiveMongoTransactionManager - Started transaction for session [ClientSessionImpl@62de8058 id = {"id": {"$binary": "fye2h5JkRh6yL3MTqtC0Xw==", "$type": "04"}}, causallyConsistent = true, txActive = true, txNumber = 2, error = d != java.lang.Boolean].
...插入跟随,然后...
DEBUG o.s.d.m.ReactiveMongoTransactionManager - Initiating transaction commit
DEBUG o.s.d.m.ReactiveMongoTransactionManager - About to commit transaction for session [ClientSessionImpl@62de8058 id = {"id": {"$binary": "fye2h5JkRh6yL3MTqtC0Xw==", "$type": "04"}}, causallyConsistent = true, txActive = true, txNumber = 2, error = d != java.lang.Boolean].
但有时,正如我从数据库的内容中看到的那样,只有第一个插入被持久化,第二个丢失。在尝试对这种情况进行建模后,我发现当整个反应性管道被取消时会发生这种“损失”(不是每次,但我能够生成一个高概率重现这种情况的测试)。
我在方法的最后一个操作符之后添加了.doOnSuccessOrError() 和.doOnCancel() 以及一些日志记录。在“正常”情况下(没有取消),doOnSuccessOrError 成功登录。但是当取消发生时,有时日志中的事件顺序是这样的:
- 事务已启动
- 插入发生
- 发生取消
-
什么都没有最终的
doOnSuccessOrError()记录了一些东西,并且在那里的onCancel()中记录了一些东西(因此取消似乎发生在业务方法执行的“中间”)李> - ...但是事务仍然被提交!
TransactionAspectSupport.ReactiveTransactionSupport 包含以下代码(用于本例):
return Mono.<Object, ReactiveTransactionInfo>usingWhen(
Mono.just(it),
txInfo -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
(txInfo, err) -> Mono.empty(),
this::commitTransactionAfterReturning)
最后一个参数是onCancel handler。
这意味着在取消时,事务实际上被提交。
问题是:为什么?当由于反应管道外部的原因而发生取消时,事务中的某些操作可能已经完成,而有些还没有(而且永远不会)。在这样的时刻提交会产生违反原子性要求的部分提交。
改为启动回滚似乎更合乎逻辑。但我想spring-tx 的作者是故意做出这个选择的。我想知道这是什么原因?
附:为了验证我的观点,我修补了spring-tx 5.2.3(顺便说一下,这是项目使用的版本),因此代码如下所示:
return Mono.<Object, ReactiveTransactionInfo>usingWhen(
Mono.just(it),
txInfo -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
(txInfo, err) -> Mono.empty(),
this::rollbackTransactionDueToCancel)
private Mono<Void> rollbackTransactionDueToCancel(@Nullable ReactiveTransactionInfo txInfo) {
if (txInfo != null && txInfo.getReactiveTransaction() != null) {
if (logger.isDebugEnabled()) {
logger.debug("Rolling transaction back for [" + txInfo.getJoinpointIdentification() + "] due to cancel");
}
return txInfo.getTransactionManager().rollback(txInfo.getReactiveTransaction());
}
return Mono.empty();
}
(基本上,只是将取消行为更改为回滚),并且有了这个补丁,我的测试不再产生任何不一致的数据。
【问题讨论】:
-
哪个类调用了
saveUser方法? -
@ModusTollens 我不能在这里显示调用者代码,它是一个封闭源代码并且包含太多不相关的信息。但是,最后,业务方法(它甚至命名不同,这里所有的名称都被更改以保护无辜者)是从在
reactor-kafka的KafkaReceiver.receive()上执行concatMap()的代码调用的。由于重试而发生取消(使用retryBackoff())。但这一切似乎与这个问题无关。 -
只是想确保它没有被定义它的同一个类调用 (
MyService)。这会导致问题。 -
你能显示堆栈跟踪吗?
-
@ModusTollens 问题是关于“取消提交”。取消可能是由不同的原因引起的。我什至不确定它实际上是由
retryBackoff()行为引起的,所以我没有堆栈跟踪(这里不相关)。
标签: java spring reactive-programming spring-data-mongodb spring-transactions