【问题标题】:How to Enable Source.Queue Backpressure如何启用 Source.Queue 背压
【发布时间】:2023-03-23 20:40:02
【问题描述】:

我正在使用host-level API with a queue

  private val (queueSource, connectionPool) = Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure).async
    .viaMat(poolFlow)(Keep.both)
    .toMat(
      Sink.foreach({
        case ((Success(resp), p)) =>
          p.success(resp)
        case ((Failure(e), p)) => p.failure(e)
      })
    )(Keep.left)
    .run()

我在连接池中有很多连接请求竞速,但我收到以下错误:

 java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request
    at akka.stream.impl.QueueSource$$anon$1.akka$stream$impl$QueueSource$$anon$$bufferElem(QueueSource.scala:84)
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:94)
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:91)
    at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:447)
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:464)
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:559)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:741)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:756)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:666)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
    at akka.actor.ActorCell.invoke(ActorCell.scala:496)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

我尝试添加 .async 但背压仍然没有起作用。上面的错误是什么意思以及如何着手调查问题?

【问题讨论】:

    标签: scala akka akka-stream akka-http


    【解决方案1】:

    您已经在使用Source.queue 对象方法构造Source,因此我认为不可能直接对调用queue.offer 的任何功能施加背压。但是,您的问题可能会以不同的方式解决。

    不同的OverflowStrategy

    您可以将策略更改为 OverflowStrategy.dropHeadOverflowStrategy.dropTail。如果您的 queueSizequeue.offer 调用的速率相比足够大,那么这可能会满足您的需求。

    【讨论】:

    • 我不能丢失消息。如果它不起作用,背压策略的意义何在?
    • @Rabzu 我不知道使用 Source.queue 的背压策略的示例。如果背压对您来说很重要,那么为什么首先使用Source.queue
    • 有什么选择?
    • 我有一个服务,我想使用 akka-http 创建一个客户端。我正在创建一个线程池,它对来自物化 akka-http 客户端流的请求进行背压。文档清楚地说明了具有队列guards itself a) by applying backpressure to all request streams connected to the cached pool 的主机级 API,通过对连接到缓存池的所有请求流施加背压)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-09-05
    • 2018-10-23
    • 2018-08-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-31
    相关资源
    最近更新 更多