【发布时间】:2020-07-30 08:08:07
【问题描述】:
我想将多个相同类型的Flux 组合在一起。在订阅时,它们应该并行执行。订阅方法应该可以限制请求的数量。
我尝试了Flux.merge(..) 和Flux.concat(..)。后者似乎强制执行顺序请求生成,即使在并行调度程序上发布时,第一个以急切的方式请求所有元素并且仅尊重 onNext 事件的 take(n) 方法,但不尊重 onRequest 事件。
我准备了一个最小的例子。三个Flux 都应在Schedulers.boundedElastic() 上执行繁重的I/O 工作。每个人都记录他们的onRequest 事件。
Flux<String> f1 = Flux.just("1").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
try {
//Long running, i/o task
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 1));
Flux<String> f2 = Flux.just("2").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
try {
//Long running, i/o task
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 2));
Flux<String> f3 = Flux.just("3").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
try {
//Long running, i/o task
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 3));
使用Flux.concat 和take(2) 进行处理
Flux.concat(f1, f2, f3).take(2)
.doOnNext(i -> LOGGER.info("onNext {}", i))
.subscribeOn(Schedulers.boundedElastic()).subscribe();
结果:
00:16:11.980 [oundedElastic-1] d.f.B.Application : onRequest 1
00:16:13.981 [oundedElastic-2] d.f.B.Application : onNext 1
00:16:13.981 [oundedElastic-2] d.f.B.Application : onRequest 2
00:16:15.982 [oundedElastic-3] d.f.B.Application : onNext 2
两个onRequest 和两个onNext 事件被传播。请求延迟了两秒但预期并行。
使用Flux.map 和take(2) 进行处理
Flux.merge(f1, f2, f3).take(2)
.doOnNext(i -> LOGGER.info("onNext {}", i))
.subscribeOn(Schedulers.boundedElastic()).subscribe();
结果
00:21:58.301 [oundedElastic-1] d.f.B.Application : onRequest 1
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 2
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 3
00:22:00.303 [oundedElastic-4] d.f.B.Application : onNext 3
00:22:00.304 [oundedElastic-3] d.f.B.Application : onNext 2
三个 onRequest 即使只有两个在预期的情况下也会发出事件。 take(n) 方法只是限制了发出的 onNext 事件。执行按预期并行完成。
未知方法的预期结果
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 2
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 3
00:22:00.303 [oundedElastic-4] d.f.B.Application : onNext 3
00:22:00.304 [oundedElastic-3] d.f.B.Application : onNext 2
只有两个onRequest 事件被发出并完成
问题
是否可以将这三个Flux 组合在一起,使其可以同时计算但在请求行为中受到限制(take(n)?)?在我的真实世界场景中,请求会导致 WebClient 调用 WebService,从而启动耗时的 i/o 任务。因此,即使我的应用程序只接收到两个 onNext 事件,第三个发出的 onRequest 会导致进入涅槃的请求。
【问题讨论】:
-
预期输出是什么?
-
您也可以发布您要解决的实际问题。这看起来可能是 X -> Y 问题。
-
我正在尝试响应式编程方法,所以我想学习它。我用预期的输出更新了我的问题。
-
我想不出这会有用的情况,是否存在真正的问题,或者您只是想知道您是否可以这样做?同样在您的预期结果中,您不会期望 1,2 1,2 而不是 2,3 3,2
-
我不需要解决这个问题来为我的实际问题找到解决方案,因为我还可以为每个 API 创建一个
Flux<String>并具有自己的限制并一个接一个地订阅它们。但是,如果我有一个有效的问题要解决,或者我误解了 project reactor 中的一些概念,这更符合我的兴趣。
标签: java reactive-programming spring-webflux project-reactor