【发布时间】:2017-03-27 14:44:47
【问题描述】:
我有两个 observables o1 和 o2。我将它们压缩到 Observable.zip() 函数上,但订阅每次都在不同的调度程序上。我希望所有 o1、o2 和 z observable 都应该订阅 Schedulers.io() observable。但每次都是随机的 Schedulers.io() 或 Schedulers.computation()。
这是我重现问题的源代码
import rx.Observable;
import rx.schedulers.Schedulers;
public class RxZipSchedulers {
public static void main(String[] args) {
for(int i=0;i<100;i++) {
Observable<String> o1 = Observable.just("o1").subscribeOn(Schedulers.computation());
Observable<String> o2 = Observable.just("o2");
Observable z = Observable.zip(o1, o2, (s1, s2) -> s1 + " " + s2 + " " + Thread.currentThread());
z.subscribeOn(Schedulers.io())
.subscribe(res -> {
System.out.println(res);
});
z.toCompletable().await();
}
}
}
在我的机器上,输出是这样的(每次注意 RxComputationScheduler 或 RxIoScheduler):
o1 o2 线程[RxComputationScheduler-1,5,main]
o1 o2 线程[RxComputationScheduler-4,5,main]
o1 o2 线程[RxComputationScheduler-1,5,main]
o1 o2 线程[RxComputationScheduler-3,5,main]
o1 o2 线程[RxComputationScheduler-4,5,main]
o1 o2 线程[RxComputationScheduler-3,5,main]
o1 o2 线程[RxIoScheduler-3,5,main]
o1 o2 线程[RxIoScheduler-2,5,main]
o1 o2 线程[RxComputationScheduler-1,5,main]
o1 o2 线程[RxComputationScheduler-3,5,main]
o1 o2 线程[RxIoScheduler-3,5,main]
o1 o2 线程[RxIoScheduler-2,5,main]
为什么所有的 o1, o2, z observables 都不订阅 Schedulers.io() ?我认为这个调度器应该在整个 observables 链上传播,但这只是偶尔发生。
【问题讨论】: