【问题标题】:How to stop the Subscriber in RxAndroid?如何在 RxAndroid 中停止订阅者?
【发布时间】:2016-11-24 09:28:34
【问题描述】:

我像这样创建一个 Observable:

public Observable<FileObjectModel> getAllFiles() {
    return Observable.create(new Observable.OnSubscribe<FileObjectModel>() {
        @Override
        public void call(Subscriber<? super FileObjectModel> subscriber) {
            //fileList is a huge list.
            for(int i = 1 ; i < fileList.size(); i ++){
                ...
                subscriber.onNext(fileItem);
            }
        }
    });
}

然后像这样订阅:

subscription = reposistory.getAllFiles()
            .compose(scanInject())
            .subscribe(...);

当用户点击按钮停止时,我想取消订阅,所以我打电话

 subscription.unsubscribe();

不幸的是,它并没有按预期停止。文件项继续发出。

请帮忙指出我的错误。

更新:

我逐行调试,发现 observable 卡在 for 循环中。在这个循环中,我调用一个递归方法并将订阅者作为参数传递。该方法将浏览所有 Android 文件系统(从根文件夹开始)以查找所有文件,然后将它们一一发出。 (我使用这种方式是因为整个系统上的文件数量很大,不应该保存在任何静态列表中。) 由于我们被困在for 循环中,订阅对象始终为空。仍然没有找到任何解决方案。

【问题讨论】:

  • 有什么理由不使用Observable.from(fileList)?传统观点是你应该尽可能避免Observable.create,因为它很容易出错
  • fileList 是通过递归方法提供的,用于获取 Android 文件系统中的所有文件。这意味着我们在开始时没有完整的列表,我必须通过递归方法传递订阅者来发出数据。

标签: rx-java reactive-programming rx-android


【解决方案1】:

首先,如果您要使用Observable.create,那么为了满足许多可能的下游运营商所需的背压,您需要附加.onBackpressureXXX。如果有疑问,请使用.onBackpressureBuffer

你不能取消订阅的原因是你的 observable 是同步的,所以 subscription 直到流完成才设置!我建议一般避免这种模式并改用:

subscriber = ...;
reposistory.getAllFiles()
    .compose(scanInject())
    .subscribe(subscriber);

【讨论】:

  • 所有onBackPressurexxx 都不满足我的情况。我不能使用xxxxDrop 或类似的,因为我不想省略任何文件。 xxxxBuffer 将导致内存不足问题,因为项目的数量非常非常大。我已经采纳了您的建议,现在如何停止/暂停它?
  • 戴夫,按照您的建议,如何停止订阅?
  • subscriber.unsubscribe()
  • 见下面我的回答。这就是我的情况。
【解决方案2】:

我建议检查订阅者是否仍在订阅:

for(int i = 1 ; i < fileList.size(); i ++){
    if (subscriber.isUnsubscribed()) {
        return;
    }
    subscriber.onNext(fileItem);
}

这不应该是过多的计算过载。

【讨论】:

    【解决方案3】:

    无论如何我都不想回答我的问题,但我已经找到了根本原因并采取了解决方法。

    正如我在问题的 update 部分中提到的,for 循环做得非常好,它迭代了数千个项目。而且,处理器被卡在里面了。

    为了解决这个问题,我将不同的线程分配给 observable 并且它异步运行,如下所示:

     subscription = getAllObjects()
                .map(this::doScan)
                .observeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.newThread())
                .subscribe(subscriber);
    

    变量subscription现在不再像以前那样null,我们可以通过调用subscription.unsubscribe()来使用它来停止订阅

    【讨论】:

    • 是的,使 observable 异步解决了您的问题,但我已经在回答中解释了这一点。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-27
    相关资源
    最近更新 更多