【问题标题】:Conditional chain of observables可观察的条件链
【发布时间】:2016-07-27 13:54:37
【问题描述】:

我想通过多个 REST API 异步检索数据。我在带有 rxJava 扩展的 Android 上使用 Retrofit,即我通过订阅 Observable 来执行任何 GET 请求。

正如我所说,我有多个源 API,所以当第一个源没有产生所需的结果时,我想尝试下一个,如果同样失败,请再次尝试下一个,依此类推,直到所有源都已完成查询或找到结果。

我正在努力将这种方法转化为对 Observables 的正确使用,因为我不知道哪些运营商可以实现这种行为,而且还需要遵守一些限制:

  • 找到结果后,不应查询剩余的 API(如果有)
  • 其他组件依赖于查询的结果,我希望它们在启动请求的时候得到一个Observable,这样这个Observable就可以通知它们请求完成了
  • 我需要保留对上述 Observable 的引用,因为在完成之前可能会多次发出相同的请求,在这种情况下,我只会在第一次需要它时启动它,后续请求只会获得通知的 Observable当请求完成时

我一开始只使用一个 API 进行查询,并使用以下 API 进行依赖组件的请求和后续通知:

private Observable<String> loadData(int jobId) {

    final ConnectableObservable<String> result = Async
            .fromCallable(() -> getResult(jobId))
            .publish();

    getRestRequest()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    dataHolder -> {
                        if (dataHolder.getData() != null && !dataHolder.getData().isEmpty()) {
                            saveData(dataHolder.getData());
                        } else {
                            markNotFound(dataHolder);
                        }
                    },
                    error -> currentJobs.remove(jobId),
                    () -> {
                        currentJobs.remove(jobId);
                        result.connect();
                    });

    return result;
}

此代码仅在第一次请求时调用,返回的 Observable 结果将保存在 currentJobs 中,后续请求只会获取 Observable 而不会再次触发请求。

非常感谢任何帮助。

【问题讨论】:

    标签: android rx-java retrofit2 reactivex


    【解决方案1】:

    假设您有一组在您每次订阅时重新连接的可观察对象:

    List<Observable<Result>> suppliers = ...
    

    那么你只需要做合乎逻辑的事情:

    Observable<Result> results = Observable
              .from(suppliers)
              .concatMap(supplier -> supplier)
              .takeFirst(result -> isAcceptable(result))
              .cache()
    

    【讨论】:

    • 我认为这是要走的路。我找到了类似的方法here。 concat() 和 from() 之间有什么功能上的区别吗?在链接的博客中,他们强调 concat() 仅在需要时进行订阅,这也适用于 from() 吗? concatMap() 的作用是什么?
    • from 只是从特定来源创建一个 Observable。 concat 接受多个 Observable 并仅在前一个结束时才开始从下一个获取项目。 concatMap 更进一步(不完全等于 concat + map)。
    【解决方案2】:

    使用.onErrorResumeNext,并假设每个服务可观察对象可能返回 0 或 1 个元素,如果没有发出任何元素,则使用 first 发出错误:

    Observable<T> a, b, c;
    ...
    a.first().onErrorResumeNext(t -> b.first())
     .onErrorResumeNext(t -> c.first())
     .onErrorResumeNext(t -> d.first())
     ...
    

    【讨论】:

    • 当其中一个请求完成但没有找到请求的数据时,它会返回一个 emtpy dataHolder。这不会导致抛出错误,因此这种方法无法按预期工作。
    • 我已经使用first 更新了答案来处理您的额外要求
    猜你喜欢
    • 1970-01-01
    • 2020-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-02-12
    • 1970-01-01
    • 2019-11-20
    相关资源
    最近更新 更多