【发布时间】:2016-04-17 09:57:11
【问题描述】:
我有一个使用 ConnectableObservable 的应用程序,它运行了很长时间。一段时间后,它的观察者神秘地停止在其 onNext() 方法中收到通知。
我编写了以下简化示例的测试。它只是一个带有无限循环的 ConnectableObservable,一个订阅者同时使用 observeOn 和 subscribeon。在 128 s.onNext(1) 调用后,它会停止通知观察者。
@Test
public void testHotObservable() throws InterruptedException{
CountDownLatch latch = new CountDownLatch(1);
ConnectableObservable<Integer> observable = Observable.<Integer>create( (s) -> {
while(true){
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(1);
}
})
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
.publish();
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onNext(Integer i) {
System.out.println("got "+i);
}
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
};
observable.subscribe(observer);
observable.connect();
latch.await();
}
这是我在调试RxJava的代码时看到的我发现它没有调用Observer的onNext()方法但我不明白的原因:
1.- s.onNext(1); 被调用:
2.- 执行到rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber.pollQueue():
void pollQueue() {
int emitted = 0;
final AtomicLong localRequested = this.requested;
final AtomicLong localCounter = this.counter;
do {
localCounter.set(1);
long produced = 0;
long r = localRequested.get();
for (;;) {
...
System.out.println("R: "+r);
if (r > 0) {
Object o = queue.poll();
if (o != null) {
child.onNext(on.getValue(o));
r--;
问题在于 r 的值。第一次执行时,它的值始终为 128。每次调用后,它减 1 (r--)。这意味着ConnectableObservable 在同时使用observeOn 和subscribeOn 时只能通知其观察者128 次。如果我删除 subscribeOn,r 的值会从每次迭代开始,并且它可以工作。
更新:
我找到了解决方案:问题是由.observerOn().subscribeOn() 的顺序引起的。如果我将它反转为.subscribeOn().observeOn() 它可以工作(我可以看到r 的值总是重置为128)。
无论如何,我会很感激解释。
【问题讨论】:
-
state和signal是可变变量吗? -
不...有必要吗?
-
是的,否则跨线程的更改可能不可见,或者变量读取可能会完全脱离循环。
-
添加了 volatile 但它仍然会在一段时间后停止...我想知道这是否可能是因为
s.onNext(signal);之后的signal = null;声明...我没有意义不过。 -
你订阅了 observable 吗?似乎没有任何信号消耗您的信号,内部缓冲区刚刚填满。
标签: java reactive-programming rx-java