【发布时间】:2014-03-17 16:35:46
【问题描述】:
我有消息的来源,即Observable。对于每条消息,我想进行一个 HTTP 调用,这将产生另一个 Observable,因此我将它们与 flatMap 组合在一起,然后将它们发送给某个订阅者。这里是这个场景的代码:
Rx.Observable.interval(1000)
.flatMap (tick) ->
// returns an `Observable`
loadMessages()
.flatMap (message) ->
// also returns and `Observable`
makeHttpRequest(message)
.subscribe (result) ->
console.info "Processed: ", result
这个例子是用 coffeescript 写的,但我认为问题陈述对任何其他 Rx 实现都是有效的。
我使用这种方法遇到的问题是loadMessages 会非常快速地生成大量消息。这意味着,我在很短的时间内发出了很多 HTTP 请求。这在我的情况下是不可接受的,所以我想将并行 HTTP 请求的数量限制在 10 个左右。换句话说,当我发出 HTTP 请求时,我想限制管道或应用某种背压。
Rx 是否有任何标准方法或最佳实践来处理这种情况?
目前我实现了非常简单(并且非常次优)的背压机制,如果系统处理的按摩过多,则忽略滴答声。它看起来像这样(简化版):
Rx.Observable.interval(1000)
.filter (tick) ->
stats.applyBackpressureBasedOnTheMessagesInProcessing()
.do (tick) ->
stats.messageIn()
.flatMap (tick) ->
// returns an `Observable`
loadMessages()
.flatMap (message) ->
// also returns and `Observable`
makeHttpRequest(message)
.do (tick) ->
stats.messageOut()
.subscribe (result) ->
console.info "Processed: ", result
但我不确定这是否可以做得更好,或者 Rx 可能已经有一些机制来处理这种需求。
【问题讨论】:
标签: system.reactive reactive-programming rxjs