【问题标题】:Akka Streams - Backpressure for Source.unfoldAsyncAkka Streams - Source.unfoldAsync 的背压
【发布时间】:2019-03-23 17:56:16
【问题描述】:

我目前正在尝试读取分页的 HTTP 资源。每个页面都是一个多部分文档,如果页面内容更多,则页面的响应会在标题中包含 next 链接。然后,自动解析器可以从最旧的页面开始,然后使用标题逐页读取以构造对下一页的请求。

我使用 Akka Streams 和 Akka Http 来实现,因为我的目标是创建一个流解决方案。我想出了这个(我将在这里只包含代码的相关部分,请随时查看this gist 以获取整个代码):

def read(request: HttpRequest): Source[HttpResponse, _] =
  Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)

val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
  .flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
  .flatMapConcat(_.parts)

....

def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
  case Some(req) =>
    Http().singleRequest(req).map { response =>
      if (response.status.isFailure()) Some((None, response))
      else nextRequest(response, HttpMethods.GET)
    }
  case None => Future.successful(None)
}

所以一般的想法是使用Source.unfoldAsync 来爬取页面并执行HTTP 请求(想法和实现非常接近this answer 中描述的内容。这将创建一个Source[HttpResponse, _] 然后可以被消耗(Unmarshal to Multipart,拆分成单独的部分,...)。

我现在的问题是HttpResponses的消耗可能需要一段时间(如果页面很大,解组需要一些时间,也许最后会有一些数据库请求来持久化一些数据,...) .因此,如果下游速度较慢,我希望Source.unfoldAsync 能够背压。默认情况下,下一个 HTTP 请求将在前一个请求完成后立即启动。

所以我的问题是:有没有办法让Source.unfoldAsync 对下游缓慢产生背压?如果没有,是否有替代方案可以使背压成为可能?

我可以想象一个使用 akka-http 提供的主机级客户端 API 的解决方案,如 here 所述,以及一个循环图,其中第一个请求的响应将用作输入以生成第二个请求,但我还没有尝试过,我不确定这是否可行。


编辑: 在玩了几天并阅读了文档和一些博客之后,我不确定我是否走在正确的轨道上,我假设 Source.unfoldAsync 的背压行为是根本原因。添加更多观察:

  • 启动流时,我看到有几个请求发出。这首先是没有问题的,只要及时消费得到的HttpResponse(参见here的描述)
  • 如果我不更改默认response-entity-subscription-timeout,我会遇到以下错误(我删除了URL):
    [WARN] [03/30/2019 13:44:58.984] [default-akka.actor.default-dispatcher-16] [default/Pool(shared->http://....)] [1 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 seconds. Make sure to read the response entity body or call discardBytes() on it. GET ... Empty -> 200 OK Chunked
    这会导致 IllegalStateException 终止流:java.lang.IllegalStateException: Substream Source cannot be materialized more than once
  • 我观察到响应的解组是流中最慢的部分,这可能是有道理的,因为响应正文是一个多部分文档,因此相对较大。但是,我希望这部分流向上游发出更少的信号(在我的例子中是Source.unfoldAsync 部分)。这应该会导致发出的请求更少。
  • 一些谷歌搜索将我带到a discussion about an issue that seems to describe a similar problem。他们还讨论了响应处理速度不够快时出现的问题。 associated merge request 将带来文档更改,建议在继续流之前完全使用 HttpResponse。在对该问题的讨论中,也有对whether or not it's a good idea to combine Akka Http with Akka Streams的质疑。因此,也许我必须更改实现以直接在 unfoldAsync 调用的函数内部进行解组。

【问题讨论】:

    标签: scala akka akka-stream akka-http


    【解决方案1】:

    根据Source.unfoldAsyncimplementation,传入的函数只有在拉取源时才会调用:

    def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
    

    因此,如果下游没有拉(背压),则不会调用传入源的函数。

    在您的要点中,您使用runForeach(与runWith(Sink.foreach) 相同)在println 完成后立即拉动上游。所以这里很难注意到背压。

    尝试将您的示例更改为runWith(Sink.queue),这将为您提供SinkQueueWithCancel 作为物化值。然后,除非您在队列中调用pull,否则流将被背压并且不会发出请求。

    请注意,在背压传播到所有流之前,可能会有一个或多个初始请求。

    【讨论】:

    • 我观察到,对 HTTP 响应进行解组的部分流需要相当长的时间。这部分位于 HTTP 调用本身和带有 println 的 Sink 之间。这不应该导致上游减速吗?如果是这样,这将推翻快速 Sink。我还尝试在 HTTP 请求之后添加 .buffer,但这没有任何区别
    • 是的,应该。也许flatMapConcat 在这里行为不端。尝试将其更改为.mapAsync(1)(r => Unmarshal(r).to[Multipart.General])
    • 谢谢,好点子。我认为mapAsync 是我想要的,而不是flatMapConcat 里面有Source.fromFuture。我刚刚尝试过,但它没有任何区别。但是,我发现了一些其他有趣的事情可能会引起人们的兴趣。我将更新我的问题以分享我学到的知识。
    【解决方案2】:

    我想我明白了。正如我在问题的编辑中已经提到的那样,我发现 this comment 是 Akka HTTP 中的一个问题,作者说:

    ...将 Akka http 混合到更大的处理流中根本不是最佳实践。相反,您需要围绕流的 Akka http 部分设置边界,以确保它们始终在允许外部处理流继续之前消耗响应。

    所以我继续尝试:我没有在流的不同阶段执行 HTTP 请求和解组,而是通过flatMaping 将Future[HttpResponse] 直接解组响应为Future[Multipart.General]。这样可以确保直接使用 HttpResponse 并避免 Response entity was not subscribed after 1 second 错误。 crawl 函数现在看起来略有不同,因为它必须返回未编组的 Multipart.General 对象(用于进一步处理)以及原始 HttpResponse(以便能够从标头构造下一个请求):

    def crawl(reqOption: Option[HttpRequest])(implicit actorSystem: ActorSystem, materializer: Materializer, executionContext: ExecutionContext): Future[Option[(Option[HttpRequest], (HttpResponse, Multipart.General))]] = {
      reqOption match {
        case Some(request) =>
          Http().singleRequest(request)
            .flatMap(response => Unmarshal(response).to[Multipart.General].map(multipart => (response, multipart)))
            .map {
              case tuple@(response, multipart) =>
                if (response.status.isFailure()) Some((None, tuple))
                else nextRequest(response, HttpMethods.GET).map { case (req, res) => (req, (res, multipart)) }
            }
        case None => Future.successful(None)
      }
    }
    

    因此必须更改其余代码。我创建了another gist,其中包含与原始问题的要点类似的等效代码。

    我期待这两个 Akka 项目能够更好地集成(文档目前没有提到这个限制,而是 HTTP API 似乎鼓励用户一起使用 Akka HTTP 和 Akka Streams),所以这感觉有点像一种解决方法,但它现在解决了我的问题。在将这部分集成到我更大的用例中时,我仍然需要弄清楚我遇到的其他一些问题,但这不是这里问题的一部分。

    【讨论】:

      猜你喜欢
      • 2020-11-05
      • 1970-01-01
      • 2021-01-19
      • 2019-05-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-13
      • 1970-01-01
      相关资源
      最近更新 更多