【发布时间】:2021-07-23 22:39:38
【问题描述】:
我正在使用 Replaying Reactor Sinks,我正在尝试实现单播和重播处理器的混合。我希望它同时仅向一个订阅者发出(UnicastProcessor),但它也可以在订阅时发出默认值(ReplayProcessor)。这是与真实案例类似的情况:
Flux<Boolean> monoC = Sinks.many().replay().latestOrDefault(true).asFlux().doOnNext(integer -> System.out.println(new Date() + " - " + Thread.currentThread().getName() + " emiting next"));
for(int i = 0; i < 5; i++) {
new Thread(() -> {
monoC.flatMap(unused ->
webClientBuilder.build()
.get()
.uri("https://www.google.com")
.retrieve()
.toEntityFlux(String.class)
.doOnSuccess(stringResponseEntity -> {
System.out.println(new Date() + " - " + Thread.currentThread().getName() + " finished processing");
})
).subscribe();
}).start();
}
那是打印:
emiting next
...
emiting next
finished processing
...
finished processing
相反,我希望它打印出来:
emiting next
finished processing
...
emiting next
finished processing
更新,对真实案例场景的更多说明:
真实的情况是:我有一个 Spring WebFlux 应用程序,它的作用类似于中继,它在特定端点 A 上接收请求,并将其中继到另一个微服务 B。然后,如果我这样做,该微服务可以回复 429走得太快,并且在标题中我必须等待多长时间才能再次重试。我已经使用 .retry 运算符和 Mono.delay 实现了重试,但与此同时,我可以在我的第一个端点 A 上收到另一个请求,该请求必须被阻止,直到 Mono.delay 完成。
我正在尝试使用 Replay Sink 来实现这一点,因此在收到 429 后,我向 sink 发出“false”,并且在 Mono.delay 结束后,它向 sink 发出 true,所以如果在同时,当我在 A 上收到任何进一步的请求时,它可以过滤掉所有的错误并等待发出一个真值。
最重要的问题是,当我收到太多请求在 A 上中继时,微服务 B 开始响应缓慢,并且变得超载。因此,我想限制接收器发射的速率。准确地说,我希望发布者发出一个值,但在订阅者点击 onCompleted 之前不要再发出。
【问题讨论】:
-
我不太了解您的用户案例。
I would like it to accept only one subscriber, so following subscribers are hold until first completes.是什么意思,请描述您的实际用户案例。 -
所以,如果您看到我提供的示例,我想序列化(按并发 1 顺序)从我的应用程序的不同位置完成的所有 GET 请求。
-
我可以阅读您的代码,但我不关心您的代码,我想知道用户案例。你的意图是什么,你在建造什么,你想做什么。因为在反应式中我们通常不使用
Thread并且代码很乱,所以我不明白你在做什么。如果我不明白您想要做什么,也许我们可以帮助您以更正确的反应方式构建它。 -
你的代码有几个很奇怪的地方,为什么布尔值的
latestOrDefault?为什么你flatMaping 超过monoC而不使用布尔值?您的代码在如何使用接收器方面还差得很远 -
我建议你阅读 reactor 文档中的 sink 部分,其中描述了如何使用 sink。 projectreactor.io/docs/core/release/reference/#processors
标签: java spring-webflux project-reactor reactor