Spring WebClient 是一个无阻塞 IO http 客户端,而 ReactorClientHttpConnector 是一个基于 Reactor-Netty 的实现。说我可以建议不要担心连接池,而是专注于完全无阻塞的服务调用。使用这种技术成功的关键是专注于一个完整的无阻塞服务调用链,该模型不涉及每个请求一个线程,如果你有一些东西阻塞你的代码,它就像一个浏览器或节点 js 开发阻止任何东西。我知道这并不常见,但基于事件循环模型的基本实现迫使您考虑完全不同的模型。
我可以告诉你,通常在基于 Netty 的实现中,你有许多与核心数量相同的事件循环,当然它是可配置的,但我认为这已经足够了,记住反应式的力量并且无阻塞 IO 编程是在您的所有代码片段中包含无阻塞 io,并为每个处理器添加更多事件循环将为您带来一些并发性,而每个处理器具有一个事件循环将使您能够完全并行使用您的处理器。
我希望这种反思可以帮助你
提示。对于您的 http 服务调用超时,您可以在下面的测试中添加您喜欢的超时:
@Test
@WithMockUser(username = "user")
fun `read basic personal details data`() {
personalDetailsRepository.save("RESUME_ID", TestCase.personalDetails()).toMono().block();
val expectedJson = TestCase.readFileAsString("personal-details.json")
webClient.get()
.uri("/resume/RESUME_ID/personal-details")
.accept(MediaType.APPLICATION_JSON)
.exchange().toMono().timeout(Duration.ofMinutes(1))
}
更新
考虑到应用程序级别的限制请求,当然可以使用背压功能来处理有时可能太大而无法可靠处理的数据流,或者如果流响应像使用 Flux limitRate() 操作符的 Flux 可以参考官方文档:
/**
* Ensure that backpressure signals from downstream subscribers are split into batches
* capped at the provided {@code prefetchRate} when propagated upstream, effectively
* rate limiting the upstream {@link Publisher}.
* <p>
* Note that this is an upper bound, and that this operator uses a prefetch-and-replenish
* strategy, requesting a replenishing amount when 75% of the prefetch amount has been
* emitted.
* <p>
* Typically used for scenarios where consumer(s) request a large amount of data
* (eg. {@code Long.MAX_VALUE}) but the data source behaves better or can be optimized
* with smaller requests (eg. database paging, etc...). All data is still processed,
* unlike with {@link #limitRequest(long)} which will cap the grand total request
* amount.
* <p>
* Equivalent to {@code flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe() }.
* Note that the {@code prefetchRate} is an upper bound, and that this operator uses a
* prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the
* prefetch amount has been emitted.
*
* @param prefetchRate the limit to apply to downstream's backpressure
*
* @return a {@link Flux} limiting downstream's backpressure
* @see #publishOn(Scheduler, int)
* @see #limitRequest(long)
*/
public final Flux<T> limitRate(int prefetchRate) {
return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate));
}
表示我建议使用此功能,不要尝试像连接限制那样以强制方式限制数据的消耗。反应式编程和无阻塞 IO 的优势之一在于令人难以置信的资源使用效率和限制资源使用似乎是对范式精神的反感