【问题标题】:Is connection pooling in akka-http using the source queue Implementation thread safe?akka-http 中使用源队列实现线程的连接池是否安全?
【发布时间】:2017-03-25 07:32:55
【问题描述】:

参考下面提到的实现:

http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html

val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
  Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case ((Success(resp), p)) => p.success(resp)
      case ((Failure(e), p))    => p.failure(e)
    }))(Keep.left)
    .run()

从多个线程提供队列 http 请求是否是线程安全的? 如果不是,那么实现此类要求的最佳方法是什么?也许使用专门的演员?

【问题讨论】:

    标签: scala connection-pooling akka-stream akka-http


    【解决方案1】:

    不,它不是线程安全的,根据 api doc: SourceQueue that current source is materialized to is for single thread usage only.

    专门的演员可以工作,但如果可以的话,使用Source.actorRef (doc link) 代替Source.queue 会更容易。

    一般来说,Source.actorRef 的缺点是没有背压,但是当你使用OverflowStrategy.dropNew 时,很明显你不会期望背压。因此,您可以使用 Source.actorRef 获得相同的行为。

    【讨论】:

    • 非常感谢您的评论。我需要在缓冲区溢出的情况下发出某种故障信号,例如 return Future.failed(BufferFlowException),据我了解,这无法使用 Source.actorRef 实现。源队列使用 QueueOfferResult API 符合描述。
    • @Nik 我仍然认为我正确回答了原始问题。至于您的新要求,我确实会实现一个专门处理队列的 Actor。它需要使用queue.offer(???) pipeTo self 添加到队列中,然后它能够​​通过在其receive 方法中处理QueueOfferResult 的各种子类型来对失败做出反应。
    • 为了防止actor邮箱溢出,你可以使用NonBlockingBoundedMailbox (doc.akka.io/docs/akka/current/scala/mailboxes.html)。
    • 最新文档删除了“仅限单线程使用”行:doc.akka.io/api/akka/2.5/akka/stream/scaladsl/…
    • 没错,它的删除可以在这里追踪:github.com/akka/akka/issues/23081
    【解决方案2】:

    正如@frederic-a 所说,SourceQueue 不是线程安全的解决方案。

    也许一个合适的解决方案是使用MergeHub(有关更多详细信息,请参阅docs)。这有效地允许您分两个阶段运行图表。

    1. 从您的集线器到您的水槽(具体化为水槽)
    2. 将点 1 处具体化的接收器分发给您的用户。 Sinks 实际上是为分布式设计的,所以这是非常安全的。

    根据MergeHub 的行为,此解决方案在背压方面是安全的

    如果消费者跟不上,那么所有的生产者都是 背压。

    下面的代码示例:

    val reqSink: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] =
      MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16)
      .via(poolClientFlow)
      .toMat(Sink.foreach({
        case ((Success(resp), p)) => p.success(resp)
        case ((Failure(e), p))    => p.failure(e)
      }))(Keep.left)
      .run()
    
    // on the user threads
    
    val source: Source[(HttpRequest, Promise[HttpResponse]), NotUsed] = ???
    source.runWith(reqSink)
    

    【讨论】:

    • 有官方文档支持吗?我仍然不能完全确定,因为网络上有很多版本,有些声称是,有些不是。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-07-06
    • 2017-02-03
    • 2012-05-30
    • 2013-07-24
    • 2019-01-17
    • 2010-12-10
    • 1970-01-01
    相关资源
    最近更新 更多