【发布时间】:2019-07-23 09:47:36
【问题描述】:
TL; DR - 是否保证默认情况下,在观察 Observable 发出的事件时,在任何给定时间只使用一个线程?
在我看来,RxJava2 通常是顺序的,除非通过诸如 parallel() 之类的东西以其他方式表示。即使使用observeOn/subscribeOn,我也看到有例如doOnNext() 永远不会有两个线程同时运行:
AtomicInteger counter = new AtomicInteger();
PublishSubject<Integer> testSubject = PublishSubject.create();
testSubject
.observeOn(Schedulers.io())
.doOnNext(val -> {
if(counter.incrementAndGet() > 1)
System.out.println("Whoa!!!!"); // <- never happens
Thread.sleep(20);
counter.decrementAndGet();
})
.subscribe();
for (int i = 0; i < 10000; i++) {
Thread.sleep(10);
testSubject.onNext(i);
}
无论我如何更改此示例 - 除非我使用 .toFlowable(...).parallel().runOn(...) 进行硬核,否则我看不到 doOnNext 同时在不同线程上运行。
我想依赖这个特性,这样我就可以忽略我的操作符中的同步问题,但是我从来没有在 RxJava2、RxJava1 甚至一般的 RX 的文档中看到它明确指定。也许我只是错过了,谁能指出合同的这一部分是在哪里描述的?
谢谢!
【问题讨论】: