【发布时间】:2019-03-23 17:56:16
【问题描述】:
我目前正在尝试读取分页的 HTTP 资源。每个页面都是一个多部分文档,如果页面内容更多,则页面的响应会在标题中包含 next 链接。然后,自动解析器可以从最旧的页面开始,然后使用标题逐页读取以构造对下一页的请求。
我使用 Akka Streams 和 Akka Http 来实现,因为我的目标是创建一个流解决方案。我想出了这个(我将在这里只包含代码的相关部分,请随时查看this gist 以获取整个代码):
def read(request: HttpRequest): Source[HttpResponse, _] =
Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)
val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
.flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
.flatMapConcat(_.parts)
....
def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
case Some(req) =>
Http().singleRequest(req).map { response =>
if (response.status.isFailure()) Some((None, response))
else nextRequest(response, HttpMethods.GET)
}
case None => Future.successful(None)
}
所以一般的想法是使用Source.unfoldAsync 来爬取页面并执行HTTP 请求(想法和实现非常接近this answer 中描述的内容。这将创建一个Source[HttpResponse, _] 然后可以被消耗(Unmarshal to Multipart,拆分成单独的部分,...)。
我现在的问题是HttpResponses的消耗可能需要一段时间(如果页面很大,解组需要一些时间,也许最后会有一些数据库请求来持久化一些数据,...) .因此,如果下游速度较慢,我希望Source.unfoldAsync 能够背压。默认情况下,下一个 HTTP 请求将在前一个请求完成后立即启动。
所以我的问题是:有没有办法让Source.unfoldAsync 对下游缓慢产生背压?如果没有,是否有替代方案可以使背压成为可能?
我可以想象一个使用 akka-http 提供的主机级客户端 API 的解决方案,如 here 所述,以及一个循环图,其中第一个请求的响应将用作输入以生成第二个请求,但我还没有尝试过,我不确定这是否可行。
编辑: 在玩了几天并阅读了文档和一些博客之后,我不确定我是否走在正确的轨道上,我假设 Source.unfoldAsync 的背压行为是根本原因。添加更多观察:
- 启动流时,我看到有几个请求发出。这首先是没有问题的,只要及时消费得到的
HttpResponse(参见here的描述) - 如果我不更改默认
response-entity-subscription-timeout,我会遇到以下错误(我删除了URL):[WARN] [03/30/2019 13:44:58.984] [default-akka.actor.default-dispatcher-16] [default/Pool(shared->http://....)] [1 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 seconds. Make sure to read the response entity body or call discardBytes() on it. GET ... Empty -> 200 OK Chunked
这会导致IllegalStateException终止流:java.lang.IllegalStateException: Substream Source cannot be materialized more than once - 我观察到响应的解组是流中最慢的部分,这可能是有道理的,因为响应正文是一个多部分文档,因此相对较大。但是,我希望这部分流向上游发出更少的信号(在我的例子中是
Source.unfoldAsync部分)。这应该会导致发出的请求更少。 - 一些谷歌搜索将我带到a discussion about an issue that seems to describe a similar problem。他们还讨论了响应处理速度不够快时出现的问题。 associated merge request 将带来文档更改,建议在继续流之前完全使用
HttpResponse。在对该问题的讨论中,也有对whether or not it's a good idea to combine Akka Http with Akka Streams的质疑。因此,也许我必须更改实现以直接在unfoldAsync调用的函数内部进行解组。
【问题讨论】:
标签: scala akka akka-stream akka-http