【发布时间】:2016-05-11 13:07:45
【问题描述】:
快速提问,因为我觉得我一定错过了什么。 我在这里使用 rxjs 是因为它是我面前的东西,我相信这是一个通用的 reactiveX 问题。
假设我有一组像这样的 Observable:
network_request = some_thing // An observable that produces the result of a network call
event_stream = network_request.flatMapLatest(function(v) {
return connectToThing(v) // This is another observable that needs v
}) // This uses the result of the network call to form a long-term event-based connection
所以,这工作正常。 不过有问题。 有时连接失败。
所以,如果我这样做 event_stream.retry() 效果很好。当它失败时,它会重新进行网络调用并获取一个新的v 用于建立新的连接。
问题
如果我想将两个东西从我的network_request 中链接起来,会发生什么?
也许我希望 UI 在每次网络调用完成时做一些事情,比如在 UI 中显示一些关于 v 的内容?
我能做到:
shared = network_request.share() // Other implementations call this refCount
event_stream = shared.flatMapLatest(...) // same as above
ui_stream = shared.flatMapLatest(...) // Other transformation on network response
如果我不做share那么它会发出两个请求,这不是我想要的,但是用share,当event_stream稍后出现错误时,它不会重试网络请求,因为引用计数仍然为 1(由于ui_stream),所以它立即返回完成。
我想要什么
这显然是我为了解释我的困惑而编造的一个小例子。
我想要的是,每次event_stream(长期连接)的结果出现错误时,都会发生以下所有情况:
- 再次发出网络请求
- 该请求的新响应用于建立新连接,
event_stream继续处理新事件,就像什么都没发生一样 -
ui_stream中也会发出相同的响应以进行进一步处理
感觉这不是一件复杂的事情,所以在拆分/扇出 RX 事物时,我一定只是误解了一些基本的东西。
我认为我可以做但想避免的解决方法
我希望导出这些 observables,所以我不能只是重新构建它们然后说“嘿,这是新事物”。我希望event_stream 和所有下游处理都不知道断开连接。
ui_stream 也一样。它刚刚获得了新的价值。
我可能会使用 Subject 作为生成计数器来解决问题,每次我希望一切重新启动时都会 ping,然后基于此将 network_request 放入 flatMap,这样我就可以打破share...
但这感觉像是一个非常老套的解决方案,所以我觉得必须有比这更好的方法。
我从根本上误解了什么?
【问题讨论】:
-
重要的是要了解,您的
sharedobservable 不知道event_stream订阅者中发生的错误。retry只是在每次遇到onError时重新订阅,所以对于shared来说,它又是一个订阅者。也许我错了,但我认为如果没有某种反馈循环(这意味着引入某种状态),你就无法解决这个问题。更好的选择是重新考虑您的解决方案以摆脱这个问题。
标签: system.reactive rxjs reactivex