【问题标题】:RxJava: calling unsubscribe from within onNextRxJava:从 onNext 内部调用取消订阅
【发布时间】:2015-06-24 12:51:25
【问题描述】:

我想知道在 onNext 处理程序中这样调用 unsubscribe 是否合法:

List<Integer> gatheredItems = new ArrayList<>();

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    public void onNext(Integer item) {
        gatheredItems.add(item);
        if (item == 3) {
            unsubscribe();
        }
    }
    public void onCompleted() {
        // noop
    }
    public void onError(Throwable sourceError) {
        // noop
    }
};

Observable<Integer> source = Observable.range(0,100);

source.subscribe(subscriber);
sleep(1000);
System.out.println(gatheredItems);

上面的代码正确输出只收集了四个元素:[0, 1, 2, 3]。但是,如果有人更改要缓存的源 observable:

Observable<Integer> source = Observable.range(0,100).cache();

然后收集所有一百个元素。我无法控制可观察的源(无论是否缓存),那么如何从onNext 中明确取消订阅?

顺便说一句:那么在onNext 内取消订阅是错误的做法吗?

(我的实际用例是,在onNext 中,我实际上正在写入输出流,当IOException 发生时,没有更多内容可以写入输出,因此我需要以某种方式停止进一步处理。)

【问题讨论】:

    标签: java rx-java


    【解决方案1】:

    第一个订阅者到达后,cache() 会缓存所有内容,并且它没有提供任何选项来阻止它从下游。您需要在 cache() 之前停止流以避免过多的保留:

    Observable<Integer> source = Observable.range(1, 100);
    
    PublishSubject<Integer> stop = PublishSubject.create();
    
    source
    .doOnNext(v -> System.out.println("Generating " + v))
    .takeUntil(stop)
    .cache()
    .doOnNext(new Action1<Integer>() {
        int calls;
        @Override
        public void call(Integer t) {
            System.out.println("Saving " + t);
            if (++calls == 3) {
                stop.onNext(1);
            }
        }
    })
    .subscribe();
    

    编辑:上面的例子在 1.0.13 以下不起作用,所以这里有一个版本应该:

    SerialSubscription ssub = new SerialSubscription();
    
    ConnectableObservable<Integer> co = source
            .doOnNext(v -> System.out.println("Generating " + v))
            .replay();
    
    co.doOnNext(v -> {
        System.out.println("Saving " + v);
        if (v == 3) {
            ssub.unsubscribe();
        }
    })
    .subscribe();
    
    co.connect(v -> ssub.set(v));
    

    【讨论】:

    • 此代码无法按预期工作,它会打印一百次“Generating”,然后是一百次“Saving”。我只希望“节省”四次。
    • 我在发布之前运行了我的示例,因此它每行打印 3 行。您是逐字尝试还是尝试将其插入流程中?
    • 我逐字使用了您的示例,只是添加了 main 和 imports。我在两台不同的计算机上的 javarx 1.0.12 上运行它,它总是打印“Saving”一百次。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-03-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-25
    相关资源
    最近更新 更多