【发布时间】:2016-11-24 02:45:08
【问题描述】:
需要帮助在主线程上进行可观察的启动,然后转到线程池以允许源继续发出新项目(无论它们是否仍在线程池中处理)。
这是我的例子:
public static void main(String[] args) {
Observable<Integer> source = Observable.range(1,10);
source.map(i -> sleep(i, 10))
.doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.computation())
.map(i -> sleep(i * 10, 300))
.subscribe( i -> System.out.println("Received " + i + " on thread " + Thread.currentThread().getName()));
sleep(-1, 30000);
}
private static int sleep(int i, int time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
总是打印:
Emitting 1 on thread main
Emitting 2 on thread main
Emitting 3 on thread main
Received 10 on thread RxComputationScheduler-1
Emitting 4 on thread main
Emitting 5 on thread main
Emitting 6 on thread main
Received 20 on thread RxComputationScheduler-1
Emitting 7 on thread main
Emitting 8 on thread main
Emitting 9 on thread main
Received 30 on thread RxComputationScheduler-1
Emitting 10 on thread main
Received 40 on thread RxComputationScheduler-1
Received 50 on thread RxComputationScheduler-1
Received 60 on thread RxComputationScheduler-1
Received 70 on thread RxComputationScheduler-1
Received 80 on thread RxComputationScheduler-1
Received 90 on thread RxComputationScheduler-1
Received 100 on thread RxComputationScheduler-1
虽然项目按预期在主线程上发出,但我希望它们之后继续移动到计算/IO 线程池。
应该是这样的:
【问题讨论】:
-
我认为在这个例子中一切都发生得如此之快,它似乎在
observeOn()之前被阻止,即使它不是。让我看看我是否可以夸大睡眠时间以证明它是有效的。
标签: java multithreading rx-java