【问题标题】:RxJava - ConnectableObservable can't notify its observers more than 128 times when using observeOn and subscribeOn simultaneouslyRxJava - 当同时使用 observeOn 和 subscribeOn 时,ConnectableObservable 无法通知其观察者超过 128 次
【发布时间】:2016-04-17 09:57:11
【问题描述】:

我有一个使用 ConnectableObservable 的应用程序,它运行了很长时间。一段时间后,它的观察者神秘地停止在其 onNext() 方法中收到通知。

我编写了以下简化示例的测试。它只是一个带有无限循环的 ConnectableObservable,一个订阅者同时使用 observeOn 和 subscribeon。在 128 s.onNext(1) 调用后,它会停止通知观察者。

@Test
public void testHotObservable() throws InterruptedException{

    CountDownLatch latch = new CountDownLatch(1);

    ConnectableObservable<Integer> observable = Observable.<Integer>create( (s) -> {
        while(true){
            try {
                Thread.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
            s.onNext(1);
        }
    })
    .observeOn(Schedulers.io())
    .subscribeOn(Schedulers.io())
    .publish();

    Observer<Integer> observer = new Observer<Integer>() {
        @Override
        public void onNext(Integer i) {
            System.out.println("got "+i);
        }
        @Override
        public void onCompleted() {
            System.out.println("completed");
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
    };

    observable.subscribe(observer);
    observable.connect();

    latch.await();
}

这是我在调试RxJava的代码时看到的我发现它没有调用Observer的onNext()方法但我不明白的原因:

1.- s.onNext(1); 被调用:

2.- 执行到rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber.pollQueue():

void pollQueue() {
    int emitted = 0;
    final AtomicLong localRequested = this.requested;
    final AtomicLong localCounter = this.counter;
    do {
        localCounter.set(1);
        long produced = 0;
        long r = localRequested.get();            
        for (;;) {
            ...
            System.out.println("R: "+r);
            if (r > 0) {
                Object o = queue.poll();
                if (o != null) {
                    child.onNext(on.getValue(o));
                    r--;

问题在于 r 的值。第一次执行时,它的值始终为 128。每次调用后,它减 1 (r--)。这意味着ConnectableObservable 在同时使用observeOn 和subscribeOn 时只能通知其观察者128 次。如果我删除 subscribeOn,r 的值会从每次迭代开始,并且它可以工作。

更新

我找到了解决方案:问题是由.observerOn().subscribeOn() 的顺序引起的。如果我将它反转为.subscribeOn().observeOn() 它可以工作(我可以看到r 的值总是重置为128)。

无论如何,我会很感激解释。

【问题讨论】:

  • statesignal 是可变变量吗?
  • 不...有必要吗?
  • 是的,否则跨线程的更改可能不可见,或者变量读取可能会完全脱离循环。
  • 添加了 volatile 但它仍然会在一段时间后停止...我想知道这是否可能是因为s.onNext(signal); 之后的signal = null; 声明...我没有意义不过。
  • 你订阅了 observable 吗?似乎没有任何信号消耗您的信号,内部缓冲区刚刚填满。

标签: java reactive-programming rx-java


【解决方案1】:

许多异步操作符使用内部的、固定大小的缓冲区,并依赖于订阅者的频繁请求。在你的情况下,有些东西没有正确请求,我不能说它是什么。我建议您使用标准组件尝试您的用例,看看可能出了什么问题,也就是说,您可以用 PublishSubject + 示例替换您的自定义 Observable:

Subject<Integer, Integer> source = PublishSubject.<Integer>create().toSerialized();

ConnectableObservable<Integer> co = source.sample(
    500, TimeUnit.MILLISECONDS, Schedulers.io())
.onBackpressureBuffer().publish();

co.subscribe(yourSubscriber);
co.connect();

source.onNext(1);

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-14
    • 1970-01-01
    • 2012-05-11
    • 1970-01-01
    相关资源
    最近更新 更多