【发布时间】:2020-08-15 10:29:47
【问题描述】:
您能否解释一下 HttpClient.response() 返回的 Flux/Mono 中究竟发生了什么?我认为 http 客户端生成的值在 Mono 完成之前不会传递到下游,但我看到生成了大量请求,最终以 reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 8 异常结束。如果我用Mono.fromCallable { } 替换对testRequest() 的调用,它会按预期工作(一项一项处理)。
我错过了什么?
测试代码:
import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider
class Test {
private val client = HttpClient.create(ConnectionProvider.create("meh", 4))
fun main() {
Flux.fromIterable(0..99)
.flatMap { obj ->
println("Creating request for: $obj")
testRequest()
.doOnError { ex ->
println("Failed request for: $obj")
ex.printStackTrace()
}
.map { res ->
obj to res
}
}
.doOnNext { (obj, res) ->
println("Created request for: $obj ${res.length} characters")
}
.collectList().block()!!
}
fun testRequest(): Mono<String> {
return client.get()
.uri("https://projectreactor.io/docs/netty/release/reference/index.html#_connection_pool")
.responseContent()
.reduce(StringBuilder(), { sb, buf ->
val str= ByteBufUtils.byteBuf2String(Charsets.UTF_8, buf)
sb.append(str)
})
.map { it.toString() }
}
}
【问题讨论】:
标签: kotlin project-reactor reactive-streams reactor-netty