【问题标题】:RxJava operator that combines two observables and emits as soon as first observable emitsRxJava 运算符,它结合了两个可观察对象并在第一个可观察对象发出时立即发出
【发布时间】:2018-01-24 09:50:10
【问题描述】:

问题: 我有一个功能,用户可以输入查询字符串,并且我制作了 2 个可观察对象,一个用于查询我的本地数据库,另一个用于从 API 获取结果。这两个操作必须并行运行。 我需要尽快显示来自 DB 的结果,当 API 结果返回时,我需要检查以删除与本地结果的重复项。

我的方法: CombineLatest 似乎是最接近我需要的东西。但问题是它只有在两个可观察对象都发出结果时才会发出。我认为我正在寻找的是 CombineLatest 和 Concat 运算符的混合体。

这是我迄今为止尝试过的:

public void performSearchAsync(String query) {
    Observable<List<LocalSearchResult>> localObservable = Observable.just(performLocalSearch(query))
            .subscribeOn(Schedulers.io());
    Observable<Response<List<ApiSearchResult>>> apiObservable = SearchApi.INSTANCE.getSearchResults(query)
            .subscribeOn(Schedulers.io());

    Observable.combineLatest(localObservable, apiObservable, (localSearchResults, listResponse) -> {
        SearchResultWrapper wrapper = new SearchResultWrapper();
        wrapper.localResults = localSearchResults;
        //hold all local result strings in a variable
        List<String> localResultNames = new ArrayList<>();
        for (LocalSearchResult localSearchResult : localSearchResults) {
            localResultNames.add(localSearchResult.name);
        }

        if (!listResponse.isSuccessful()) {
            return wrapper;
        }
        wrapper.apiResults = listResponse.body();

        //Do simple de-dupe
        List<ApiSearchResult> deDupedResults = new ArrayList<>();
        for (ApiSearchResult apiResult : wrapper.apiResults) {
            if (!localResultNames.contains(apiResult.name)) {
                deDupedResults.add(apiResult);
            }
        }
        wrapper.apiResults = deDupedResults;
        return wrapper;
    }).distinctUntilChanged(searchResultWrapper -> searchResultWrapper.localResults)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<SearchResultWrapper>() {
                @Override
                public void onNext(SearchResultWrapper searchResultWrapper) {
                    if (searchResultWrapper == null) return;
                    if (searchResultWrapper.localResults != null && !searchResultWrapper.localResults.isEmpty()) {
                        //add to view
                        applySearchResult(searchResultWrapper.localResults);
                    }
                    if (searchResultWrapper.apiResults == null || searchResultWrapper.apiResults.isEmpty())
                        return;
                    //add to view
                    apiSearchResultsAdapter.updateResults(searchResultWrapper.apiResults);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onCompleted() {

                }
            });
}

static class SearchResultWrapper {
    List<LocalSearchResult> localResults;
    List<ApiSearchResult> apiResults;
}

在这里,我假设我的本地查询总是比 API 快,并且如果它更慢,我希望运算符仅在本地 DB 可观察对象发出后才发出。我知道重复数据删除逻辑可以改进,它只是一个示例表示,但我希望它发生在我有两个结果的那个函数中。

即使 API observable 抛出错误,我也希望发出本地结果并可供用户使用,尽管我不确定这种结构是否可行。

希望我的解释足够清楚,有人可以建议一种合适的方法来解决这个问题。我也在考虑编写自己的运算符,虽然我听说这很棘手。

【问题讨论】:

  • 您说这两个操作必须并行运行,但您使用Observable.just 获取本地结果,这意味着performLocalSearchQuery() 将同步运行。您应该将justObservable.defer 包装在一起,以使performLocalSearchQuery()io 调度程序上运行。
  • @JanosBreuer 是的,感谢您指出错误。我已按照您的建议将justObservable.defer 包装在一起。

标签: android rx-java


【解决方案1】:

其实你不需要把它弄得太复杂。只需使用mergeconcatdoOnNext 就可以了。

例如(对不起,我在这里使用 kotlin):

    // actually here is a mistake. Observable.just() receive a item rather than a expression.
    // you should use Observable.fromCallable() instead.
    val localObservable = Observable.fromCallable { performLocalSearch(query) }
        .subscribeOn(Schedulers.io())


    val apiObservable = SearchApi.INSTANCE.getFoodSearchResults(query)
        .doOnNext {
            //do your check here
        }
        .subscribeOn(Schedulers.io())
        .map{
            //map it to the same type with local
        }


    localObservable.mergeWith { apiObservable}

或者您可以使用concatWith 而不是mergeWith,这样可以保证在localObservable Done 之后调用您的api 调用。

【讨论】:

  • 我同意你关于将一个对象映射到另一个对象的观点,以便简化。我自己也考虑过。另外,在这种方法中,我需要保存对本地搜索结果的全局引用,以将其与 apiObservable 的 doOnNext 块进行比较,对吧?
  • @hari_shankar 是的。您需要比较该结果。如果你不想。您可以使用scan 来完成此操作,它将为您缓存一个值并使用它进行计算。
  • 对不起,我花了这么长时间才回来。我能够让它与您的解决方案一起使用。我将 API 结果映射到本地结果,并应用了mergeDelayError 运算符,以便一个流中的错误不会破坏另一个流。还使用scan 运算符删除重复项。
猜你喜欢
  • 1970-01-01
  • 2020-03-05
  • 2020-02-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-04-23
  • 1970-01-01
相关资源
最近更新 更多