【问题标题】:Why Flux.flatMap() doesn't wait for completion of inner publisher?为什么 Flux.flatMap() 不等待内部发布者完成?
【发布时间】:2020-08-15 10:29:47
【问题描述】:

您能否解释一下 HttpClient.response() 返回的 Flux/Mono 中究竟发生了什么?我认为 http 客户端生成的值在 Mono 完成之前不会传递到下游,但我看到生成了大量请求,最终以 reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 8 异常结束。如果我用Mono.fromCallable { } 替换对testRequest() 的调用,它会按预期工作(一项一项处理)。

我错过了什么?

测试代码:

import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider

class Test {
    private val client = HttpClient.create(ConnectionProvider.create("meh", 4))

    fun main() {
        Flux.fromIterable(0..99)
                .flatMap { obj ->
                    println("Creating request for: $obj")
                    testRequest()
                            .doOnError { ex ->
                                println("Failed request for: $obj")
                                ex.printStackTrace()
                            }
                            .map { res ->
                                obj to res
                            }
                }
                .doOnNext { (obj, res) ->
                    println("Created request for: $obj ${res.length} characters")
                }
                .collectList().block()!!
    }

    fun testRequest(): Mono<String> {
        return client.get()
                .uri("https://projectreactor.io/docs/netty/release/reference/index.html#_connection_pool")
                .responseContent()
                .reduce(StringBuilder(), { sb, buf ->
                    val str= ByteBufUtils.byteBuf2String(Charsets.UTF_8, buf)
                    sb.append(str)
                })
                .map { it.toString() }
    }
}

【问题讨论】:

    标签: kotlin project-reactor reactive-streams reactor-netty


    【解决方案1】:

    当您像这样ConnectionProvider.create("meh", 4) 创建ConnectionProvider 时,这意味着连接池的最大连接数为 4,最大挂起请求数为 8。有关此内容的更多信息,请参见 here

    当您使用flatMap 时,这意味着Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave 请参阅here 了解更多信息。

    所以发生的情况是您尝试同时运行所有请求。

    所以你有两个选择:

    • 如果您想使用flatMap,请增加待处理请求的数量。
    • 如果您想保留未决请求的数量,您可以考虑使用例如concatMap 而不是flatMap,这意味着Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation。查看更多here 相关信息。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-06-08
      • 2019-12-18
      • 2012-12-13
      • 2016-07-21
      • 1970-01-01
      • 2021-12-08
      • 1970-01-01
      • 2017-08-12
      相关资源
      最近更新 更多