【问题标题】:ReactiveX Retry with Multiple ConsumersReactiveX 与多个消费者一起重试
【发布时间】:2016-05-11 13:07:45
【问题描述】:

快速提问,因为我觉得我一定错过了什么。 我在这里使用 rxjs 是因为它是我面前的东西,我相信这是一个通用的 reactiveX 问题。

假设我有一组像这样的 Observable:

network_request = some_thing // An observable that produces the result of a network call
event_stream = network_request.flatMapLatest(function(v) {
    return connectToThing(v) // This is another observable that needs v
}) // This uses the result of the network call to form a long-term event-based connection

所以,这工作正常。 不过有问题。 有时连接失败。

所以,如果我这样做 event_stream.retry() 效果很好。当它失败时,它会重新进行网络调用并获取一个新的v 用于建立新的连接。

问题

如果我想将两个东西从我的network_request 中链接起来,会发生什么? 也许我希望 UI 在每次网络调用完成时做一些事情,比如在 UI 中显示一些关于 v 的内容?

我能做到:

shared = network_request.share() // Other implementations call this refCount
event_stream = shared.flatMapLatest(...) // same as above
ui_stream = shared.flatMapLatest(...) // Other transformation on network response

如果我不做share那么它会发出两个请求,这不是我想要的,但是用share,当event_stream稍后出现错误时,它不会重试网络请求,因为引用计数仍然为 1(由于ui_stream),所以它立即返回完成。

我想要什么

这显然是我为了解释我的困惑而编造的一个小例子。 我想要的是,每次event_stream(长期连接)的结果出现错误时,都会发生以下所有情况:

  1. 再次发出网络请求
  2. 该请求的新响应用于建立新连接,event_stream 继续处理新事件,就像什么都没发生一样
  3. ui_stream 中也会发出相同的响应以进行进一步处理

感觉这不是一件复杂的事情,所以在拆分/扇出 RX 事物时,我一定只是误解了一些基本的东西。

我认为我可以做但想避免的解决方法

我希望导出这些 observables,所以我不能只是重新构建它们然后说“嘿,这是新事物”。我希望event_stream 和所有下游处理都不知道断开连接。 ui_stream 也一样。它刚刚获得了新的价值。

我可能会使用 Subject 作为生成计数器来解决问题,每次我希望一切重新启动时都会 ping,然后基于此将 network_request 放入 flatMap,这样我就可以打破share... 但这感觉像是一个非常老套的解决方案,所以我觉得必须有比这更好的方法。

我从根本上误解了什么?

【问题讨论】:

  • 重要的是要了解,您的shared observable 不知道event_stream 订阅者中发生的错误。 retry 只是在每次遇到 onError 时重新订阅,所以对于 shared 来说,它又是一个订阅者。也许我错了,但我认为如果没有某种反馈循环(这意味着引入某种状态),你就无法解决这个问题。更好的选择是重新考虑您的解决方案以摆脱这个问题。

标签: system.reactive rxjs reactivex


【解决方案1】:

当我一直在思考这个问题时,我得出了与 ionoy 相同的认识,即 retry 只是断开连接并重新连接,而上游不知道这是由于错误造成的。

当我想到我想要什么时,我意识到我真的想要一个像链条这样的东西,也是一个观众,所以我现在有了这个:

network_request = some_thing
network_shadow = new Rx.Subject()

event_stream = network_request.do(network_shadow).flatMapLatest(...)
ui_stream = network_shadow.whatever

这具有在event_stream 或下游重试将导致整个事情重新启动的属性,而ui_stream 是它自己的事情。 那里的任何错误都不会做任何事情,因为network_shadow 实际上并不是event_stream 的订阅者,但只要主事件链正在运行,它就会剥离值。

我觉得这并不理想,但它比我担心我必须做的更好,即在doOnError 中添加一个restartEverything.onNext(),这会很糟糕。

我现在要处理这个问题,我们会看看它在哪里咬我......

【讨论】:

    【解决方案2】:

    您需要使用 Publish 使您的冷可观察到热。阅读http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#HotAndCold 以获得很好的解释。

    【讨论】:

    • 是的,我知道冷热。我遇到的问题是我想让它变热,但我的一个订阅者仍然有一个错误导致整个事情被处理掉。我考虑编写一个与refcount 相反的实现,在任何时候任何订阅者取消订阅我都会从上游取消订阅,但这可能会导致问题,所以我做了这个“侧通道”的东西。
    猜你喜欢
    • 1970-01-01
    • 2020-04-07
    • 2020-09-29
    • 2011-06-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-14
    • 1970-01-01
    相关资源
    最近更新 更多