【问题标题】:Rxjava User-Retry observable with .cache operator?使用 .cache 运算符可观察到 Rxjava 用户重试?
【发布时间】:2015-11-14 04:29:58
【问题描述】:

我有一个使用以下代码创建的可观察对象。

Observable.create(new Observable.OnSubscribe<ReturnType>() {

      @Override
      public void call(Subscriber<? super ReturnType> subscriber) {
        try {
          if (!subscriber.isUnsubscribed()) {
            subscriber.onNext(performRequest());
          }
          subscriber.onCompleted();
        } catch (Exception e) {
          subscriber.onError(e);
        }
      }
    });

performRequest() 将按照您的预期执行长时间运行的任务。

现在,由于我可能会在很短的时间内启动 same Observable 两次或更多次,因此我决定编写这样的转换器:

  protected Observable.Transformer<ReturnType, ReturnType> attachToRunningTaskIfAvailable() {
    return origObservable -> {
      synchronized (mapOfRunningTasks) {
        // If not in maps
        if ( ! mapOfRunningTasks.containsKey(getCacheKey()) ) {
          Timber.d("Cache miss for %s", getCacheKey());
          mapOfRunningTasks.put(
              getCacheKey(),
              origObservable
                  .doOnTerminate(() -> {
                    Timber.d("Removed from tasks %s", getCacheKey());
                    synchronized (mapOfRunningTasks) {
                      mapOfRunningTasks.remove(getCacheKey());
                    }
                  })
                  .cache()
          );
        } else {
          Timber.d("Cache Hit for %s", getCacheKey());
        }
        return mapOfRunningTasks.get(getCacheKey());
      }
    };
  }

这基本上将原始.cache 可观察到HashMap&lt;String, Observable&gt;

这基本上不允许具有相同getCacheKey()(例如login)的多个请求并行调用performRequest()。相反,如果第二个 login 请求到达而另一个正在进行中,则第二个可观察请求将被“丢弃”,而将使用已经运行的请求。 => 所有对onNext 的调用都将是cached 并发送给两个订阅者,实际上只访问我的后端一次。

现在,假设以下代码:

// Observable loginTask
public void doLogin(Observable<UserInfo> loginTask) {
    loginTask.subscribe(
      (userInfo) -> {},
      (throwable) -> {
        if (userWantsToRetry()) {
            doLogin(loinTask);
        } 
      }
    );
}

loginTask 是由前一个转换器组成的。好吧,当发生错误(可能是连接)和userWantsToRetry() 时,我基本上会用相同的可观察对象重新调用该方法。不幸的是,它已被缓存,我将收到相同的错误,而无需再次点击 performRequest(),因为该序列被重播。

有没有一种方法可以让转换器提供给我的“相同请求分组”行为和重试按钮?

【问题讨论】:

    标签: reactive-programming rx-java


    【解决方案1】:

    您的问题有很多内容,很难直接表达。不过,我可以提出一些建议。首先,您的Observable.create 可以通过使用Observable.defer(Func0&lt;Observable&lt;T&gt;&gt;) 来简化。这将在每次订阅新订阅者时运行 func,并捕获并将任何异常引导到订阅者的 onError。

    Observable.defer(() -> {
        return Observable.just(performRequest());
    });
    

    接下来,您可以使用observable.repeatWhen(Func1&lt;Observable&lt;Void&gt;, Observable&lt;?&gt;&gt;) 来决定何时重试。重复操作符将在 onComplete 事件后重新订阅 observable。当接收到 onComplete 事件时,此特定重载将向主题发送事件。您提供的功能将收到此主题。当您不想重试时,您的函数应该调用类似 takeWhile(predicate) 和 onComplete 的方法。

    Observable.just(1,2,3).flatMap((Integer num) -> {
        final AtomicInteger tryCount = new AtomicInteger(0);
        return Observable.just(num)
            .repeatWhen((Observable<? extends Void> notifications) -> 
                notifications.takeWhile((x) -> num == 2 && tryCount.incrementAndGet() != 3));
    })
    .subscribe(System.out::println); 
    

    输出:

    1
    2
    2
    2
    3
    

    上面的示例显示,当事件不是 2 并且最多重试 22 次时,会大声重试。如果您切换到repeatWhen,那么 flatMap 将包含您使用缓存的 observable 还是 realWork observable 的决定。希望这会有所帮助!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-12-15
      • 2017-02-17
      • 1970-01-01
      • 1970-01-01
      • 2017-01-24
      • 1970-01-01
      相关资源
      最近更新 更多