【发布时间】:2019-01-03 00:47:30
【问题描述】:
最近我们决定在我们的项目中使用 spring-webflux 和 couchbase,我们需要帮助来解决反应式编程中的以下用例
- 在 Bucket1 couchbase 中验证并保存请求,(我们使用了 javax.validation 和 spring ReactiveCouchbaseRepository。
-
调用外部服务(我们使用webclient调用API。
-
成功后,
- 将 AUDIT 文档写入 Bucket2。
- 获取插入到 Bucket1 中的文档并发送该文档作为响应。
- 将审核文档写入 Bucket2
-
一旦失败,
- 将 AUDIT 文档写入 Bucket2。
- 删除BUCKET1中插入的文档并抛出异常。
- 将审核文档写入 Bucket2
-
我们编写了一个服务类,并使用两个存储库类将文档保存到 couchbase,并使用一个 webclient 调用外部服务。
我们的服务类方法业务逻辑如下所示。
{
//1. Validate the request and throw the error
List<String> validationMessages = handler.validate(customerRequest);
if (validationMessages != null && !validationMessages.isEmpty()) {
return Mono.error(new InvalidRequestException("Invalid Request", validationMessages, null));
}
//generate the id, set it to the request and save it to BUCKET1
String customerRequestId = sequenceGenerator.nextId(Sequence.CUSTOMER_ACCOUNT_ID);
customerRequest.setcustomerRequestId(customerRequestId);
customerRequestMono = bucket1Repository.save(customerRequest);
//2. Call the external service using webclient
externalServiceResponse = customerRequestWebClient.createCFEEnrollment(customerRequest);
//2. Subscribe to the response and and on Success write audit to BUCKET2 , and onerror write audit to BUCKET2 , and delete the inserted documet from BUCKET1
externalServiceResponse.subscribe(response -> {
//Initialise the success audit bean and save
//2.1 a) Write Audt to BUCKET2
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
//2.2 a) Write Audt to BUCKET2
//Initialise the error audit bean and save
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
//2.2 b)Delete the inserted
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});
//Get the loan account id and return the same
finalResponse = bucket1Repository.findByCustomerId(customerId);
return Mono.when(externalServiceResponse,customerRequestMono,finalResponse).then(finalResponse)
.doOnSuccess(resp -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(resp));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
})
.doOnError(error -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(error.getMessage()));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
});
}
我们观察到的几个问题是
- 在某些情况下,在我们订阅它之前不会保留文档。这是预期的行为吗?我们是否需要订阅才能保存文档?
- 出现错误时无法删除文档。
- 我也知道我没有遵循上面的纯反应式编程。帮助我在反应式中有效地编写代码。
请大家指点指点
【问题讨论】:
标签: reactive-programming couchbase spring-webflux