【问题标题】:How to Schedule multiple Flux in parallel and limit onNext and onRequest Events如何并行调度多个 Flux 并限制 onNext 和 onRequest 事件
【发布时间】:2020-07-30 08:08:07
【问题描述】:

我想将多个相同类型的Flux 组合在一起。在订阅时,它们应该并行执行。订阅方法应该可以限制请求的数量。

我尝试了Flux.merge(..)Flux.concat(..)。后者似乎强制执行顺序请求生成,即使在并行调度程序上发布时,第一个以急切的方式请求所有元素并且仅尊重 onNext 事件的 take(n) 方法,但不尊重 onRequest 事件。

我准备了一个最小的例子。三个Flux 都应在Schedulers.boundedElastic() 上执行繁重的I/O 工作。每个人都记录他们的onRequest 事件。

Flux<String> f1 = Flux.just("1").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
    try {
        //Long running, i/o task
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 1));
Flux<String> f2 = Flux.just("2").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
    try {
        //Long running, i/o task
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 2));
Flux<String> f3 = Flux.just("3").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
    try {
        //Long running, i/o task
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 3));

使用Flux.concattake(2) 进行处理

Flux.concat(f1, f2, f3).take(2)
        .doOnNext(i -> LOGGER.info("onNext {}", i))
        .subscribeOn(Schedulers.boundedElastic()).subscribe();

结果:

00:16:11.980   [oundedElastic-1] d.f.B.Application                        : onRequest 1
00:16:13.981   [oundedElastic-2] d.f.B.Application                        : onNext 1
00:16:13.981   [oundedElastic-2] d.f.B.Application                        : onRequest 2
00:16:15.982   [oundedElastic-3] d.f.B.Application                        : onNext 2

两个onRequest 和两个onNext 事件被传播。请求延迟了两秒但预期并行

使用Flux.maptake(2) 进行处理

Flux.merge(f1, f2, f3).take(2)
        .doOnNext(i -> LOGGER.info("onNext {}", i))
        .subscribeOn(Schedulers.boundedElastic()).subscribe();

结果

00:21:58.301   [oundedElastic-1] d.f.B.Application                        : onRequest 1
00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 2
00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 3
00:22:00.303   [oundedElastic-4] d.f.B.Application                        : onNext 3
00:22:00.304   [oundedElastic-3] d.f.B.Application                        : onNext 2

三个 onRequest 即使只有两个在预期的情况下也会发出事件take(n) 方法只是限制了发出的 onNext 事件。执行按预期并行完成。

未知方法的预期结果

00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 2
00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 3
00:22:00.303   [oundedElastic-4] d.f.B.Application                        : onNext 3
00:22:00.304   [oundedElastic-3] d.f.B.Application                        : onNext 2

只有两个onRequest 事件被发出并完成

问题

是否可以将这三个Flux 组合在一起,使其可以同时计算但在请求行为中受到限制(take(n)?)?在我的真实世界场景中,请求会导致 WebClient 调用 WebService,从而启动耗时的 i/o 任务。因此,即使我的应用程序只接收到两个 onNext 事件,第三个发出的 onRequest 会导致进入涅槃的请求。

【问题讨论】:

  • 预期输出是什么?
  • 您也可以发布您要解决的实际问题。这看起来可能是 X -> Y 问题。
  • 我正在尝试响应式编程方法,所以我想学习它。我用预期的输出更新了我的问题。
  • 我想不出这会有用的情况,是否存在真正的问题,或者您只是想知道您是否可以这样做?同样在您的预期结果中,您不会期望 1,2 1,2 而不是 2,3 3,2
  • 我不需要解决这个问题来为我的实际问题找到解决方案,因为我还可以为每个 API 创建一个 Flux&lt;String&gt; 并具有自己的限制并一个接一个地订阅它们。但是,如果我有一个有效的问题要解决,或者我误解了 project reactor 中的一些概念,这更符合我的兴趣。

标签: java reactive-programming spring-webflux project-reactor


【解决方案1】:

我也是 project-reactor 的新手,但我想出了这个解决方法,有些人可能会称之为 hack:

Flux.fromIterable(Lists.partition(Arrays.asList(f1, f2, f3), 2))
        .take(1)
        .flatMap(partition -> Flux.merge(partition)
                .doOnNext(i -> LOGGER.info("onNext {}", i)))
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();

它为我打印了这个:

2020-04-17 01:07:37.658  INFO 59488 --- [oundedElastic-1] com.example.demo.SomeService1            : onRequest 1
2020-04-17 01:07:37.659  INFO 59488 --- [oundedElastic-1] com.example.demo.SomeService1            : onRequest 2
2020-04-17 01:07:39.664  INFO 59488 --- [oundedElastic-3] com.example.demo.SomeService1            : onNext 2
2020-04-17 01:07:39.667  INFO 59488 --- [oundedElastic-2] com.example.demo.SomeService1            : onNext 1

【讨论】:

  • 感谢您分享您的方法。在第一行中,您将列表划分为最大大小为两个的子列表。因此,在这种情况下,采取一个将起作用。但是,如果您将大小划分为 1,您的方法也确实有效。因此 take() 方法可以定义发出多少请求。不幸的是,我想为这个问题找到一个反应版本。
  • 我感觉doOnRequest 方法是为不同的目的而设计的。如果您只想在每次发出元素时执行某些操作,那么您可能应该使用使用 Mono.fromCallable(() -&gt; heavyIoOperation()) 的惰性(冷)发布者。如果您希望仅在发出第一个元素时完成,您可以使用布尔值来跟踪它是否已发出。例如,您可以这样做:Mono.fromCallable(() -&gt; myBoolean).filter(b -&gt; b).switchIfEmpty(() -&gt; heavyIoOperation())
  • 在我的真实案例中,我创建了一个 Flux,其中包含遍历 API 的连续 WebClient 调用。我发现,似乎在每个 doOnRequest 事件上都会发出 GET-Request。所以我认为这是一个最小的例子,因为我的问题是,发出的 GET 请求比预期的要多。我需要用Mono.fromCallable(..) 看看你的例子。我使用Mono.defer(..) 也有类似的想法,但不幸的是它没有发现任何改进。
猜你喜欢
  • 2021-10-21
  • 2021-05-12
  • 2018-09-08
  • 1970-01-01
  • 2018-10-12
  • 2015-04-01
  • 2019-02-06
  • 1970-01-01
  • 2020-11-12
相关资源
最近更新 更多