【问题标题】:Akka HTTP streaming API with cycles never completes带有循环的 Akka HTTP 流 API 永远不会完成
【发布时间】:2021-07-06 15:50:39
【问题描述】:

我正在构建一个应用程序,在该应用程序中,我接受用户的请求,调用 REST API 以取回一些数据,然后根据该响应,进行另一个 HTTP 调用等等。基本上,我正在处理一棵数据树,其中树中的每个节点都需要我递归调用此 API,如下所示:

     A
    / \
   B   C
  / \   \
 D   E   F

我使用 Akka HTTP 和 Akka Streams 来构建应用程序,所以我使用它的流 API,如下所示:

val httpFlow = Http().cachedConnection(host = "localhost")
val flow = GraphDSL.create() { implicit builder =>
   import GraphDSL.Implicits._

   val merge = b.add(Merge[Data](2))
   val bcast = b.add(Broadcast[ResponseData](2))

   takeUserData ~> merge ~> createRequest ~> httpFlow ~> processResponse ~> bcast
                   merge <~         extractSubtree                       <~ bcast

   FlowShape(takeUserData.in, bcast.out(1))
}

我知道在 Akka Streams 应用程序中处理递归的最佳方法是在流之外处理递归,但是由于我递归调用 HTTP 流来获取每个数据子树,所以我想确保在 API 过载的情况下,flow 被适当地反压。

问题是这个流永远不会完成。如果我将它连接到这样的简单来源:

val source = Source.single(data)
val sink = Sink.seq[ResponseData]

source.via(flow).runWith(sink)

它打印出它正在处理树中的所有数据,然后停止打印任何东西,永远处于空闲状态。

我阅读了the documentation about cycles,建议在其中放置MergePreferred,但这似乎没有帮助。 This question 有所帮助,但我不明白为什么 MergePreferred 不会停止死锁,因为与他们的示例不同,元素在树的每一级都从流中删除。

为什么MergePreferred 不能避免死锁,还有其他方法吗?

【问题讨论】:

  • 您在流中的哪个位置打印树中的所有数据正在处理? (例如,在处理之前还是之后?)
  • 在processResponse中,有一个println语句,我可以看到打印的所有数据元素。
  • 所以每个元素都被处理了,只是流没有完成?你在MergePreferred 中使用eagerComplete = true 吗?
  • 正确,所有元素都得到处理,只是流没有完成。如果我在MergePreferred 中使用eagerComplete = true,则流完成,但仅打印未反馈到循环中的元素(换句话说,打印来自树根的元素,而不是下一层)跨度>
  • 当涉及到解决方案时,是否要求(如果处理多棵树),以A 为根的整个树在以G 为根的树之前被处理/发出(G 不是A 的孩子)?

标签: akka-stream akka-http


【解决方案1】:

MergePreferred(在没有eagerComplete 为真的情况下)将在所有输入完成后完成,这对于 Akka Streams 中的阶段通常是正确的(完成从一开始就向下流动)。

这意味着在输入和extractSubtree 信号完成之前,合并无法传播完成。 extractSubtreebcast 发出完成信号之前不会发出完成信号(很可能是在不知道该流程中的阶段的情况下),这(再次很可能)在processResponse 发出完成信号之前不会发生,直到@ 才会发生987654327@ 表示完成,直到createRequest 发出完成信号才会发生,直到merge 发出完成信号才会发生。因为一般来说检测这个循环是不可能的(考虑到有些阶段的完成是完全动态的),Akka Streams 有效地采取了这样的立场:如果你想创建这样的循环,你可以决定如何打破这个循环。

正如您所注意到的,eagerComplete 为 true 会改变这种行为,但是因为它会在任何输入完成后立即完成(在这种情况下,由于循环,它将始终是输入)merge 完成并且取消对extractSubtree 的需求(这本身可能(取决于Broadcast 是否设置了eagerCancel)导致下游取消),这可能会导致extractSubtree 发出的至少一些元素未被处理。

如果您绝对确定输入完成意味着循环最终会干涸,您可以使用eagerComplete = false,如果您有一些方法可以在循环干涸并且输入完成后完成extractSubtree。一个大致的大纲(不知道具体是什么extractSubtree):

  • 将从bcast 进入extractSubtree 的所有内容映射到输入的Some
  • 预实现一个Source.actorRef,您可以将None 发送到该@,保存ActorRef(这将是此源的具体化值)
  • 将输入与预先实现的源合并
  • 在提取子树时,使用statefulMapConcat 阶段来跟踪a)是否已经看到了None 和b)有多少子树待处理(初始值为1,加上这个(第一代)子树的数量节点减 1,即没有子节点减 1);如果看到None 并且没有子树待处理,则发出List(None),否则发出List 包裹在Some 中的每个子树
  • 有一个takeWhile(_.isDefined),一旦看到None就完成
  • 如果您在 extractSubtrees 中有更复杂的东西(例如副作用),您必须弄清楚将它们放在哪里
  • 在合并外部输入之前,将其通过watchTermination 阶段,并在未来回调(成功时)将None 发送到ActorRef,您在为Source.actorRef 预实现extractSubtrees 时获得。因此,当输入完成时,watchTermination 将成功触发并有效地向extractSubtrees 发送一条消息,以观察它何时完成了飞行树。

【讨论】:

  • 感谢您非常详细的回答 - 非常有帮助。 “无限循环”发生在单元测试中,我最终通过take() 调用打破了该测试。在生产中,图的“两端”将分别连接到 JMS 源和接收器。在生产中,由于循环是无限的,这最终会导致图表死锁,还是只是让它保持安全?
  • 如果源有机地永远不会完成,我会说流打算永远运行(或至少直到程序终止参与者系统或被杀死),所以不完成不是问题.
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-10-18
  • 1970-01-01
  • 2017-05-13
  • 1970-01-01
  • 2017-01-27
  • 1970-01-01
相关资源
最近更新 更多