【发布时间】:2015-06-24 12:51:25
【问题描述】:
我想知道在 onNext 处理程序中这样调用 unsubscribe 是否合法:
List<Integer> gatheredItems = new ArrayList<>();
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
public void onNext(Integer item) {
gatheredItems.add(item);
if (item == 3) {
unsubscribe();
}
}
public void onCompleted() {
// noop
}
public void onError(Throwable sourceError) {
// noop
}
};
Observable<Integer> source = Observable.range(0,100);
source.subscribe(subscriber);
sleep(1000);
System.out.println(gatheredItems);
上面的代码正确输出只收集了四个元素:[0, 1, 2, 3]。但是,如果有人更改要缓存的源 observable:
Observable<Integer> source = Observable.range(0,100).cache();
然后收集所有一百个元素。我无法控制可观察的源(无论是否缓存),那么如何从onNext 中明确取消订阅?
顺便说一句:那么在onNext 内取消订阅是错误的做法吗?
(我的实际用例是,在onNext 中,我实际上正在写入输出流,当IOException 发生时,没有更多内容可以写入输出,因此我需要以某种方式停止进一步处理。)
【问题讨论】: