【发布时间】:2020-02-20 19:58:20
【问题描述】:
我正在尝试使用 Akka Streams 实现分页。目前我有
case class SomeObject(id:Long, next_page:Option[Map[String,String]])
def chainRequests(uri: Uri): Future[Option[(Uri, T)]] = {
if (uri.isEmpty) return Future.successful(None)
val response: Future[Response[T]] = sendWithRetry(prepareRequest(HttpMethods.GET, uri)).flatMap(unmarshal)
response.map { resp =>
resp.next_page match {
case Some(next_page) => Some(next_page("uri"), resp.data)
case _ => Some(Uri.Empty, resp.data)
}
}
}
Source.single(SomeObject).map(Uri(s"object/${_.id}")).map(uri => Source.unfoldAsync(url)(chainRequest)).map(...some processing goes here)
问题是,如果我执行 source.take(1000) 并且分页有很多元素(页面),那么下游在 Source.unfoldAsync 完成之前不会获得新元素。
我试图在 Flows 中使用循环,例如
val in = builder.add(Flow[Uri])
val out = builder.add[Flow[T]]
val partition = b.add(Partition[Response[T]](2,r => r.next_page match {case Some(_)=>1; case None => 0}))
val merge = b.add(Merge[Response[T]],2)
in ~> mergeUri ~> sendRequest ~> partition
mergeUri.preferred <~ extractNextUri <~ partition.out(1)
partition.out(0) ~> Flow[Response[T]].map(_.data) ~> out
FlowShape(in.in, out.out)
但上面的代码不起作用。
我坚持创建自己的 GraphStage。 UnfoldAsync 采用第一个元素,但使用 Flow 解决方案我没有“第一个”元素。有什么建议?
谢谢
【问题讨论】:
-
感谢您的回复!不完全是。那里的解决方案使用 Source 作为结果类型。但我想基于 Flow 构建 GraphStage。我当前的 GraphStage 看起来像这样:我从上游得到一些东西 -> onPush() -> 从输入中获取并更改状态 -> 调用 onPull() -> 执行 AsyncCallback 并在成功时再次推送元素。这里的问题是上游完成,我不能在那里推送任何东西。
标签: scala akka akka-stream akka-http