【问题标题】:Stop rxJava observable chain execution on disposing在处理时停止 rxJava 可观察链执行
【发布时间】:2017-09-04 18:21:19
【问题描述】:

在应用程序中调试 rxJava 网络调用时,我遇到了一种情况,如果我们通过订阅 observables 链返回的 disposeclear 处置对象,那么只有第一个 observable 被处置而不是其他由flatMap链接observables

看看下面的演示代码sn-p:

CompositeDisposable testCompositeDisposal = new CompositeDisposable();

private void testLoadData() {
    Disposable disposable = Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            Thread.sleep(3000);
            Log.w("Debug: ", "First: " + i);
            sbr.onNext(true);
        }
        sbr.onComplete();
    }).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            Thread.sleep(3000);
            Log.w("Debug: ", "Second: " + i);
            sbr.onNext(true);
        }
        sbr.onComplete();
    })).doOnNext(value -> {
        Log.w("Debug: ", "doONNext");
    }).doOnDispose(()-> {
        Log.w("Debug: ", "doOnDispose: observable has been disposed");
    }).subscribe();

    testCompositeDisposal.add(disposable);
}

@Override
public void onStop() {
    super.onStop();
    testCompositeDisposal.clear();
}

输出:

 W/Debug:: First: 0
 W/Debug:: doOnDispose: observable has been disposed // I dispose Observable chain here.
 W/Debug:: First: 1
 W/Debug:: First: 2
 W/Debug:: First: 3
 W/Debug:: First: 4

正如您在上面的日志输出中看到的那样,当我处理给定的 rxJava 可观察链时,只有第一个可观察的会停止发射项目。

我想停止所有被链接的可观察对象。

解决这个问题的惯用方法是什么?

【问题讨论】:

  • 所有那些 Observable.create 的东西看起来都很邪恶:-)。首先:您还需要将平面映射的 observables 添加到复合一次性用品中。第二:不要到处 create() 。这不是 rxjava 的设计方式。带有 .timeout/.timer/.delay 的 Observable.just( ...) 或 Observable.range() 效果很好。
  • @EmanuelSeibold 这只是我的实现演示。我在这里使用计时器来模拟网络调用。我已经使用 doOnSubscribe 来为平面映射的 observable 获取一次性的。它抛出了奇怪的异常。

标签: android rx-java rx-java2


【解决方案1】:

两件事:

  • flatMap 可以预先消费来自上游的物品(Android 上最多 16 个);
  • 第二个更适用于您的用例,在您调用onNext 之前,您应该检查观察者是否已处置(通过.isDisposed())并在发生这种情况时中止。

另外,second flatMap 被终止(实际上它永远不会被调用)。 第一个继续。

编辑

private void testLoadData() { 
    Disposable disposable = Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            if(sbr.isDisposed()) return;  // this will cause subscription to terminate.
            Thread.sleep(3000);
            Log.w("Debug: ", "First: " + i);
            sbr.onNext(true); 
        } 
        sbr.onComplete(); 
    }).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> { 
        for (int i = 0; i < 5; i++) { 
            Thread.sleep(3000); 
            Log.w("Debug: ", "Second: " + i); 
            sbr.onNext(true); 
        } 
        sbr.onComplete(); 
    })).doOnNext(value -> { 
        Log.w("Debug: ", "doONNext"); 
    }).doOnDispose(()-> { 
        Log.w("Debug: ", "doOnDispose: observable has been disposed"); 
    }).subscribe(); 

    testCompositeDisposal.add(disposable); 
} 

【讨论】:

  • 我可以应付第二点,但这是唯一的方法吗?我的意思是这是惯用的方式吗?
  • 还有一件事,我该如何中止?通过抛出可观察到的错误?
  • 是的,这是终止迭代的正确方法。而当订阅者被释放时,你只需从 lambda 中返回,不需要其他任何东西。请注意,您没有解释您的用例是什么,因为到目前为止您展示的所有内容都可以使用现有的运算符完成。
  • 是的,谢谢。我的用例是我需要调用 Web 服务,并在响应到来时存储它。在用户离开之间,应用程序数据库对象被关闭并抛出异常“数据库未打开”。因此应用程序崩溃。
  • @DuosDuo 在使用 RxJava2 和改造 3 年后,我的看法发生了变化;我认为应该积极尝试避免显式创建,而是依赖内置运算符。在我们的项目中,每 30K SLOC 大约有 1 个显式 subscribe 和 1 个显式 generate,这些是用于不可分解的非常时髦的流。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-01-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-29
相关资源
最近更新 更多