【问题标题】:Recurring observable streams重复的可观察流
【发布时间】:2016-12-13 18:41:06
【问题描述】:

我有一个可观察到的事件流,这些事件在一个事件上终止。但是,一旦终止,我需要从头开始重新开始流。流的表示如下:

awaitStartEvent()
  .switchMap(value -> awaitSecondEvent(value))
  .subscribe(result -> {
    doSomethingWithResult(result);
    // need to start at awaitStartEvent again here
  }, error -> {
    handleError(error);
    // need to start at awaitStartEvent again here
  }

什么是处理这种情况的好方法?

【问题讨论】:

  • 你试过repeat运营商吗?
  • 不会重复重新订阅最后一个 observable 吗?即它将重新订阅 awaitSecondEvent 而不是 start
  • repeat 重新订阅整个上游。
  • 哦,我把它和重试混淆了,我会测试一下
  • 重试是一样的,但只是在序列以 OnError 而不是 OnComplete 终止的情况下重新订阅。

标签: rx-java system.reactive reactive-programming reactive


【解决方案1】:

根据@maxost 的评论,使用Repeat 运算符。如果您也希望它在遇到错误时继续运行,那么您也需要在其中添加 Retry 运算符。这是您需要放置 handleError 逻辑的地方,因为最终订阅不会看到这些错误。

【讨论】:

    【解决方案2】:

    您需要repeatretry,但由于您在问题中有handleError 副作用,因此异常不会传播到下游以由retry 运算符处理。

    这里使用do 类运算符。

    awaitStartEvent()
      .switchMap(value -> awaitSecondEvent(value))
      .doOnNext(result -> { doSomethingWithResult(result); })
      .doOnError(error -> { handleError(error); })
      .repeat()
      .retry()
      .subscribe();
    

    【讨论】:

      【解决方案3】:
      awaitStartEvent()
          .switchMap(value -> awaitSecondEvent(value))
          .doOnNext(result -> doSomethingWithResult(result)) //handle result
          .repeat() //resubsribe when onComplete called
          .doOnError(error -> handleError(error)) //handle error
          .retry() //resubscribe when onError called
          .subscribe();
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2017-12-28
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-10-05
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多