【发布时间】:2021-07-06 15:50:39
【问题描述】:
我正在构建一个应用程序,在该应用程序中,我接受用户的请求,调用 REST API 以取回一些数据,然后根据该响应,进行另一个 HTTP 调用等等。基本上,我正在处理一棵数据树,其中树中的每个节点都需要我递归调用此 API,如下所示:
A
/ \
B C
/ \ \
D E F
我使用 Akka HTTP 和 Akka Streams 来构建应用程序,所以我使用它的流 API,如下所示:
val httpFlow = Http().cachedConnection(host = "localhost")
val flow = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Data](2))
val bcast = b.add(Broadcast[ResponseData](2))
takeUserData ~> merge ~> createRequest ~> httpFlow ~> processResponse ~> bcast
merge <~ extractSubtree <~ bcast
FlowShape(takeUserData.in, bcast.out(1))
}
我知道在 Akka Streams 应用程序中处理递归的最佳方法是在流之外处理递归,但是由于我递归调用 HTTP 流来获取每个数据子树,所以我想确保在 API 过载的情况下,flow 被适当地反压。
问题是这个流永远不会完成。如果我将它连接到这样的简单来源:
val source = Source.single(data)
val sink = Sink.seq[ResponseData]
source.via(flow).runWith(sink)
它打印出它正在处理树中的所有数据,然后停止打印任何东西,永远处于空闲状态。
我阅读了the documentation about cycles,建议在其中放置MergePreferred,但这似乎没有帮助。 This question 有所帮助,但我不明白为什么 MergePreferred 不会停止死锁,因为与他们的示例不同,元素在树的每一级都从流中删除。
为什么MergePreferred 不能避免死锁,还有其他方法吗?
【问题讨论】:
-
您在流中的哪个位置打印树中的所有数据正在处理? (例如,在处理之前还是之后?)
-
在processResponse中,有一个println语句,我可以看到打印的所有数据元素。
-
所以每个元素都被处理了,只是流没有完成?你在
MergePreferred中使用eagerComplete = true吗? -
正确,所有元素都得到处理,只是流没有完成。如果我在
MergePreferred中使用eagerComplete = true,则流完成,但仅打印未反馈到循环中的元素(换句话说,打印来自树根的元素,而不是下一层)跨度> -
当涉及到解决方案时,是否要求(如果处理多棵树),以
A为根的整个树在以G为根的树之前被处理/发出(G不是A的孩子)?
标签: akka-stream akka-http