【问题标题】:Message processing throttling/backpressure消息处理节流/背压
【发布时间】:2014-03-17 16:35:46
【问题描述】:

我有消息的来源,即Observable。对于每条消息,我想进行一个 HTTP 调用,这将产生另一个 Observable,因此我将它们与 flatMap 组合在一起,然后将它们发送给某个订阅者。这里是这个场景的代码:

Rx.Observable.interval(1000)
.flatMap (tick) ->
  // returns an `Observable`
  loadMessages()
.flatMap (message) ->
  // also returns and `Observable`
  makeHttpRequest(message)
.subscribe (result) ->
  console.info "Processed: ", result

这个例子是用 coffeescript 写的,但我认为问题陈述对任何其他 Rx 实现都是有效的。

我使用这种方法遇到的问题是loadMessages 会非常快速地生成大量消息。这意味着,我在很短的时间内发出了很多 HTTP 请求。这在我的情况下是不可接受的,所以我想将并行 HTTP 请求的数量限制在 10 个左右。换句话说,当我发出 HTTP 请求时,我想限制管道或应用某种背压。

Rx 是否有任何标准方法或最佳实践来处理这种情况?

目前我实现了非常简单(并且非常次优)的背压机制,如果系统处理的按摩过多,则忽略滴答声。它看起来像这样(简化版):

Rx.Observable.interval(1000)
.filter (tick) ->
  stats.applyBackpressureBasedOnTheMessagesInProcessing()
.do (tick) ->
  stats.messageIn()
.flatMap (tick) ->
  // returns an `Observable`
  loadMessages()
.flatMap (message) ->
  // also returns and `Observable`
  makeHttpRequest(message)
.do (tick) ->
  stats.messageOut()
.subscribe (result) ->
  console.info "Processed: ", result

但我不确定这是否可以做得更好,或者 Rx 可能已经有一些机制来处理这种需求。

【问题讨论】:

    标签: system.reactive reactive-programming rxjs


    【解决方案1】:

    这不是严格意义上的背压,这只是限制并发。这是一个简单的方法(忽略我可能错误的语法,通过 TextArea 编码):

    Rx.Observable.interval(1000)
        .flatMap (tick) ->
            // returns an `Observable`
            loadMessages()
        .map (message) ->
            // also returns and `Observable`, but only when
            // someone first subscribes to it
            Rx.Observable.defer ->
                makeHttpRequest(message)
        .merge 10 // at a time
        .subscribe (result) ->
            console.info "Processed: ", result
    

    在 C# 中,等效的想法是 SelectMany,而不是 Select(Defer(x)).Merge(n)Merge(int) 最多订阅n in-flight Observables,并将其余的缓存到以后。我们拥有Defer 的原因是为了让我们在Merge(n) 订阅我们之前不做任何工作。

    【讨论】:

    • 但是你有一个问题,你的 observables 作业队列建立起来并且一次只处理 10 个。如果您处理它们的速度不够快,则会出现内存泄漏。您想用采样器替换 Rx.Observable.interval(1000),当活动作业的数量低于 10 时,该采样器会生成新的 observables。那就是 backpressure
    • 你在一般情况下是正确的,但看看他们提供的样本,我认为这不是问题
    • 很高兴知道延期。我认为@bradgonesurfing 有一个正确的观点,所以消息缓冲也应该考虑在内。
    【解决方案2】:

    在 RXJS 中你可以使用背压子模块

    http://rxjs.codeplex.com/SourceControl/latest#src/core/backpressure/

    disclaimer我从未使用过 JS 的 RX 版本,但您确实要求一种实现背压的标准方法,并且核心库似乎支持它。 RX c# 尚无此支持。不知道为什么。

    【讨论】:

      【解决方案3】:

      听起来您想从队列中拉取而不是推送您的 http 请求。 Rx 真的是这里正确的技术选择吗?

      编辑:

      一般来说,我不会使用 Rx 设计一个我对源事件有完全命令式控制的解决方案。这不是一个被动的场景。

      Rxjs 中的背压模块显然是为处理您不拥有源流的情况而编写的。给你。

      TPL 数据流听起来更适合这里。

      如果你必须使用 RX,你可以像这样设置一个循环:如果你想限制 X 个并发事件,设置一个主题作为你的消息源并强制推送(@ 987654323@) X 条消息。在您的订阅者中,您可以在 OnNext 处理程序的每次迭代中将新消息推送到主题,直到源耗尽。这保证了最多 X 条消息在传输中。

      【讨论】:

      • 你是对的,在这种特殊情况下,我正在拉扯。我提供了更多细节只是为了使示例更具体,但这并不意味着它将始终保持这种状态。在某些时候,我可以(并且希望)切换到消息源的推送模型。恕我直言,它不会有太大变化。我仍然很有可能会非常快地大量推送消息,我需要以某种方式“保护”调用makeHttpRequest方法的管道的中间部分。
      • “Rx 真的是这里正确的技术选择吗?”这是我想知道的。到目前为止,它对于我目前实现的相对复杂的消息处理流程来说效果很好。所以我感觉它非常适合这项任务。我只有几个小问题,这就是其中之一。我需要在几个地方应用这种背压。我为他们找到了自己的解决方案(主要是通过缓冲一些消息并施加背压),但我只是想知道,是否有更惯用的方法来解决这类问题。
      • This question,虽然没有直接解决您的问题,但可能很有趣。我担心需要并行化——单独的 Rx 通常不适合在这里——TPL Dataflow 可能会更好。否则,您可以通过将 Rx 管道推送到队列并使用工作人员处理请求来组合方法,并通过主题将工作人员结果推送出去。这通常比直接在 Rx 中处理背压更简单。
      • 用一种限制管道中消息的简单方法更新了我的答案。
      猜你喜欢
      • 1970-01-01
      • 2017-05-19
      • 2017-06-04
      • 2017-12-28
      • 1970-01-01
      • 1970-01-01
      • 2021-06-02
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多