【问题标题】:How to control flow of a transactional stream with Spring Data R2DBC?如何使用 Spring Data R2DBC 控制事务流的流程?
【发布时间】:2019-11-06 12:52:42
【问题描述】:

对事务流的支持似乎一直是recently implemented,但由于其新颖性,代码示例并不多。

有人可以展示一个事务流的示例,它执行一系列数据库插入,然后在成功时返回一些值,但在插入之间有一个中游检查点来测试某些条件并可能回滚事务并返回不同的值,具体取决于检查点结果?

【问题讨论】:

  • 你是指checkpoint的保存点吗?
  • 抱歉,我不知道有 checkpoint 实体。我只是指一般意义上的检查点,即中途发生的一些测试,可能会引导事务被取消并且流返回一些替代结果。
  • 虽然你的评论让我很好奇看到一个保存点的例子,它根据一些中游条件提交事务的一部分,能够在某些下游条件下回滚整个事务或仅回滚在其他条件下直到保存点。

标签: r2dbc spring-data-r2dbc


【解决方案1】:

响应式事务遵循与命令式事务相同的模式:

  1. 在运行任何用户空间命令之前启动事务
  2. 运行用户空间命令
  3. 提交(或回滚)

这里需要注意的几个方面:连接总是与反应序列的具体化相关联。我们从Thread-bound 连接中得知,该连接绑定到命令式编程中的执行,转化为响应式编程中的具体化。

因此每个(并发)执行都会分配一个连接。

Spring Data R2DBC 不支持保存点。请看以下代码示例,该示例说明了提交或回滚的决定:

DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);

TransactionalOperator transactionalOperator = TransactionalOperator
        .create(new R2dbcTransactionManager(connectionFactory));

transactionalOperator.execute(tx -> {

    Mono<Void> insert = databaseClient.execute("INSERT INTO legoset VALUES(…)")
            .then();

    Mono<Long> select = databaseClient.execute("SELECT COUNT(*) FROM legoset")
            .as(Long.class)
            .fetch()
            .first();

    return insert.then(select.handle((count, sink) -> {

        if(count > 10) {
            tx.setRollbackOnly();
        }

    }));
}).as(StepVerifier::create).verifyComplete();

这里值得注意的方面是:

  1. 我们使用的是TransactionalOperator 而不是@Transactional
  2. .handle() 中的代码调用setRollbackOnly() 来回滚事务。

使用@Transactional,您通常会使用异常来表示回滚条件。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-09-29
    • 1970-01-01
    • 1970-01-01
    • 2019-01-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多