【问题标题】:RxJava: How can I reset a long running hot observable chain?RxJava:如何重置长时间运行的热可观察链?
【发布时间】:2023-03-29 04:55:01
【问题描述】:

对于我的应用程序的搜索功能,我有一个热的可观察链,它执行以下操作。

  1. 接受用户输入字符串到EditTextTextChangedEvent)(mainThread
  2. 去抖动 300 毫秒(在 computation 线程上)
  3. 显示加载微调器 (mainThread)
  4. 使用该字符串查询 SQL 数据库(此查询可能需要 100 毫秒到 2000 毫秒之间的任何时间)(Schedulers.io()
  5. 向用户显示结果 (mainThread)

由于第 3 步的长度变化很大,因此会出现竞争条件,即较近期的搜索结果显示在较近期的结果之上(有时)。假设用户想输入chicken,但由于输入速度很奇怪,单词的第一部分在整个词之前发出:

  • 首先发送搜索chick,然后发送chicken
  • chick 需要1500ms 执行,而chicken 需要300ms 执行。
  • 这会导致chick 搜索结果错误地显示搜索词chicken。这是因为 chicken 搜索首先完成(仅用了 300 毫秒),然后是 chick 搜索(1500 毫秒)。

我该如何处理这种情况?

  • 一旦用户通过TextChangedEvent 触发新搜索,我就不再关心旧搜索,即使它仍在运行。有什么办法可以取消旧的搜索?

完整的可观察代码:

subscription = WidgetObservable.text(searchText)
                .debounce(300, TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                        //do this on main thread because it's a UI element (cannot access a View from a background thread)

                        //get a String representing the new text entered in the EditText
                .map(new Func1<OnTextChangeEvent, String>() {
                    @Override
                    public String call(OnTextChangeEvent onTextChangeEvent) {
                        return onTextChangeEvent.text().toString().trim();
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        presenter.handleInput(s);
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s != null && s.length() >= 1 && !s.equals("");
                    }
                }).doOnNext(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Timber.d("searching for string: '%s'", s);
                    }
                })
                        //run SQL query and get a cursor for all the possible search results with the entered search term
                .flatMap(new Func1<String, Observable<SearchBookmarkableAdapterViewModel>>() {
                    @Override
                    public Observable<SearchBookmarkableAdapterViewModel> call(String s) {
                        return presenter.getAdapterViewModelRx(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                        //have the subscriber (the adapter) run on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                        //subscribe the adapter, which receives a stream containing a list of my search result objects and populates the view with them
                .subscribe(new Subscriber<SearchBookmarkableAdapterViewModel>() {
                    @Override
                    public void onCompleted() {
                        Timber.v("Completed loading results");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Timber.e(e, "Error loading results");
                        presenter.onNoResults();
                        //resubscribe so the observable keeps working.
                        subscribeSearchText();
                    }

                    @Override
                    public void onNext(SearchBookmarkableAdapterViewModel searchBookmarkableAdapterViewModel) {
                        Timber.v("Loading data with size: %d into adapter", searchBookmarkableAdapterViewModel.getSize());
                        adapter.loadDataIntoAdapter(searchBookmarkableAdapterViewModel);
                        final int resultCount = searchBookmarkableAdapterViewModel.getSize();
                        if (resultCount == 0)
                            presenter.onNoResults();
                        else
                            presenter.onResults();
                    }
                });

【问题讨论】:

    标签: java android system.reactive rx-java rx-android


    【解决方案1】:

    使用switchMap 代替flatMap。这将导致它在您开始新查询时丢弃*前一个查询。

    *这是如何工作的:

    每当外部源 observable 产生一个新值时,switchMap 调用您的选择器以返回一个新的内部 observable(在这种情况下为presenter.getAdapterViewModelRx(s))。 switchMap 然后 unsubscribes 从它正在收听的上一个内部 observable 并 订阅 到新的。

    取消订阅之前的内部 observable 有两个效果:

    1. observable 产生的任何通知(值、完成、错误等)都将被忽略并丢弃。

    2. observable 将被通知其观察者已取消订阅,并且可以选择采取措施取消它所代表的任何异步进程。

    你放弃的查询是否真的被取消完全取决于presenter.getAdapterViewModelRx()的实现。理想情况下,它们将被取消以避免不必要地浪费服务器资源。但即使它们继续运行,上面的 #1 也可以防止您的 typeahead 代码看到陈旧的结果。

    【讨论】:

    • 太棒了!你能更详细地介绍一下它是如何工作的吗? SwitchMap 只是杀死了之前运行的查询 mid query?
    • @ZakTaccardi 添加了更多信息。查询的实际取消是一个合作事务。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多