【问题标题】:Spring data couchbase, unable to delete the document by id using reactive programmingSpring data couchbase,无法使用反应式编程按id删除文档
【发布时间】:2019-01-03 00:47:30
【问题描述】:

最近我们决定在我们的项目中使用 spring-webflux 和 couchbase,我们需要帮助来解决反应式编程中的以下用例

  1. 在 Bucket1 couchbase 中验证并保存请求,(我们使用了 javax.validation 和 spring ReactiveCouchbaseRepository。
  2. 调用外部服务(我们使用webclient调用API。

    • 成功后,

      • 将 AUDIT 文档写入 Bucket2。
      • 获取插入到 Bucket1 中的文档并发送该文档作为响应。
      • 将审核文档写入 Bucket2
    • 一旦失败,

      • 将 AUDIT 文档写入 Bucket2。
      • 删除BUCKET1中插入的​​文档并抛出异常。
      • 将审核文档写入 Bucket2

我们编写了一个服务类,并使用两个存储库类将文档保存到 couchbase,并使用一个 webclient 调用外部服务。

我们的服务类方法业务逻辑如下所示。

    {

    //1. Validate the request and throw the error
    List<String> validationMessages = handler.validate(customerRequest);
    if (validationMessages != null && !validationMessages.isEmpty()) {
        return Mono.error(new InvalidRequestException("Invalid Request", validationMessages, null));
    }

    //generate the id, set it to the request and save it to BUCKET1
    String customerRequestId = sequenceGenerator.nextId(Sequence.CUSTOMER_ACCOUNT_ID);
    customerRequest.setcustomerRequestId(customerRequestId);
    customerRequestMono = bucket1Repository.save(customerRequest);


    //2. Call the external service using webclient
    externalServiceResponse = customerRequestWebClient.createCFEEnrollment(customerRequest);

    //2. Subscribe to the response and and on Success write audit to BUCKET2 , and onerror write audit to BUCKET2 , and delete the inserted documet from BUCKET1
    externalServiceResponse.subscribe(response -> {
        //Initialise the success audit bean and save
        //2.1 a) Write Audt to BUCKET2
        Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
         }, errorResp -> {
        //2.2 a) Write Audt to BUCKET2
        //Initialise the error audit bean and save
        Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);

        //2.2 b)Delete the inserted
        Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
    });

    //Get the loan account id and return the same
    finalResponse = bucket1Repository.findByCustomerId(customerId);
    return Mono.when(externalServiceResponse,customerRequestMono,finalResponse).then(finalResponse)
            .doOnSuccess(resp -> {
                try {
                    finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(resp));
                    Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }

            })
            .doOnError(error -> {
                try {
                    finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(error.getMessage()));
                    Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }

            });
}

我们观察到的几个问题是

  1. 在某些情况下,在我们订阅它之前不会保留文档。这是预期的行为吗?我们是否需要订阅才能保存文档?
  2. 出现错误时无法删除文档。
  3. 我也知道我没有遵循上面的纯反应式编程。帮助我在反应式中有效地编写代码。

请大家指点指点

【问题讨论】:

    标签: reactive-programming couchbase spring-webflux


    【解决方案1】:

    获取上面的一段代码:

    externalServiceResponse.subscribe(response -> {
        Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
         }, errorResp -> {
        Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
        Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
    });
    

    它有两个反应式编程问题:

    1. 您正在创建未订阅的 Mono,因此它们永远不会执行。
    2. 您不应该在订阅中创建它们,而是使用 flatMap 或 onErrorResume 链接它们,预先订阅。

    这样的东西应该可以解决问题(请原谅我,我还没有测试过,所以你可能需要做一些调整):

    externalServiceResponse
       // If something goes wrong then delete the inserted doc
       .onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))
    
       // Always want to save the audit regardless
       .then(bucket2Repository.save(cfeAudit))
    
       .subscribe();
    

    代码中还有其他问题需要修复,例如看起来您想在订阅最终的 Mono 之前将多个 Monos 合并在一起,但希望这可以帮助您入门。

    【讨论】:

      猜你喜欢
      • 2023-04-03
      • 1970-01-01
      • 1970-01-01
      • 2019-04-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-01-25
      相关资源
      最近更新 更多