【发布时间】:2023-03-23 20:40:02
【问题描述】:
我正在使用host-level API with a queue。
private val (queueSource, connectionPool) = Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure).async
.viaMat(poolFlow)(Keep.both)
.toMat(
Sink.foreach({
case ((Success(resp), p)) =>
p.success(resp)
case ((Failure(e), p)) => p.failure(e)
})
)(Keep.left)
.run()
我在连接池中有很多连接请求竞速,但我收到以下错误:
java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request
at akka.stream.impl.QueueSource$$anon$1.akka$stream$impl$QueueSource$$anon$$bufferElem(QueueSource.scala:84)
at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:94)
at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:91)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:447)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:464)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:559)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:741)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:756)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:666)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
我尝试添加 .async 但背压仍然没有起作用。上面的错误是什么意思以及如何着手调查问题?
【问题讨论】:
标签: scala akka akka-stream akka-http