【发布时间】:2020-08-08 01:47:31
【问题描述】:
我们正在使用 project-reactor 从外部 Web 服务检索一些数据并生成一堆结果对象。
首先,我们需要获取一些触发下一次 Web 服务调用所需的主数据。在主数据可用后,我们会根据主数据的结果检索更多数据。接下来我们必须等待所有 Monos 发出它的结果。然后我们处理所有数据并构建我们的结果对象。
我们在反应流方面没有太多经验。我们的嵌套订阅解决方案有效,但我们相信可能有更好的方法来归档我们想做的事情。
问题 1
Masterdata_A 和 Masterdata_B 可以并行获取,但是如何在不嵌套的情况下以反应方式表达呢? getFluxMasterdata_B 的每个结果都应该与 getMonoMasterdata_A 的一个结果相结合。
问题 2
应以某种方式限制具有两个 Masterdata 的 Tupel,以免 Web 服务因许多数据请求而不堪重负。 1 秒的实际延迟只是一个猜测,似乎可行,但最好定义第一个内部 flatMap 的最大并行执行次数,以便一次最多有 N 个等待的 web 服务调用。
问题 3
将来我们可能必须从 Web 服务获取更多数据来构建 ProcessingResult。是否有定义反应流以使其可读/可理解的最佳实践? 反应式流的嵌套可以还是应该避免(将所有内容都保留在顶层)?
域模型
private static class Masterdata_A
{
private List<MasterdataRecord_A> records;
}
private static class MasterdataRecord_A { /* ... business relevant fields */ }
private static class MasterdataRecord_B { /* ... business relevant fields */ }
private static class Data_A { /* ... business relevant fields */ }
private static class Data_B { /* ... business relevant fields */ }
private static class Data_C { /* ... business relevant fields */ }
private static class ProcessingResult { /* ... business relevant fields */ }
WebserviceImpl
private static class Webservice
{
private Mono<Masterdata_A> getMonoMasterdata_A() { /* fetch data from external webservice */ }
private Flux<MasterdataRecord_B> getFluxMasterdata_B() { /* fetch data from external webservice */ }
private Mono<Data_A> getMonoData_A() { /* fetch data from external webservice */ }
private Mono<Data_B> getMonoData_B() { /* fetch data from external webservice */ }
private Mono<Data_C> getMonoData_C() { /* fetch data from external webservice */ }
}
BusinessServiceImpl
public class BusinessService
{
public void processData(...params...)
{
Webservice webservie = getWebservice();
// As soon as Mono<Masterdata_A> emits its result AND Flux<Masterdata_B> emits its first result than the first inner flatMap should be executed
// to fetch some extra data from the service based on the actual masterdata.
// For building the ProcessingResult we need access to all data available in the actual context.
webservice.getMonoMasterdata_A()
.subscribe((Masterdata_A masterdataA) -> {
webservice.getFluxMasterdata_B()
.delayElements(Duration.ofSeconds(1))
.flatMap((MasterdataRecord_B masterdataB) -> {
Mono<Data_A> monoA = webservice.getMonoData_A(masterdataA);
Mono<Data_B> monoB = webservice.getMonoData_B(masterdataB);
Mono<Data_C> monoC = webservice.getMonoData_C(masterdataA, masterdataB);
// wait for result of all Monos
return Mono.zip(monoA, monoB, monoC);
})
.flatMap((Tuple3<Data_A, Data_B, Data_C> data) -> {
Data_A dataA = data.getT1();
Data_B dataB = data.getT2();
Data_C dataC = data.getT3();
// create result from masterdataA, masterdataB, dataA, dataB, dataC
ProcessingResult result = ...;
return Mono.just(result);
})
.subscribe(processingResult -> {
// store result to db/filesystem
});
});
}
}
【问题讨论】:
-
masterdata B 是事件流(Flux),masterdata A 只是单个事件(Mono)。你将如何将两者结合起来?您能否更清楚地解释您的问题?您可以使用 .merge() 来并行调用服务。
标签: java project-reactor reactive-streams spring-reactive spring-reactor