【问题标题】:Why does my RxJava Observable emit only to the first consumer?为什么我的 RxJava Observable 只向第一个消费者发出?
【发布时间】:2016-03-21 22:37:04
【问题描述】:

谁能解释为什么下面的测试失败了?

public class ObservableTest {
    @Test
    public void badObservableUsedTwiceDoesNotEmitToSecondConsumer() {
        // Any simpler observable makes the test pass
        Observable<Integer> badObservable = Observable.just(1)
                .zipWith(Observable.just(2), (one, two) -> Observable.just(3))
                .flatMap(observable -> observable);

        ObservableCalculator calc1 = new ObservableCalculator(badObservable);
        ObservableCalculator calc2 = new ObservableCalculator(badObservable);

        // zipping causes the failure
        // Calling calculate().toBlocking().subscribe() on each calc passes
        // Observable.from(listOfCalcs).flatMap(calc -> calc.calculate()) passes
        Observable.zip(ImmutableList.of(calc1.calculate(), calc2.calculate()), results -> results)
                .toBlocking()
                .subscribe();

        assertThat(calc1.hasCalculated).isTrue();
        assertThat(calc2.hasCalculated).isTrue(); // this fails
    }

    private static class ObservableCalculator {
        private final Observable<?> observable;

        public boolean hasCalculated = false;

        public ObservableCalculator(Observable<?> observable) {
            this.observable = observable;
        }

        public Observable<Void> calculate() {
            return observable.concatMap(o -> {
                hasCalculated = true;
                // returning Observable.just(null) makes the test pass
                return Observable.empty();
            });
        }
    }
}

我已尝试进一步简化“坏”可观察对象,但找不到可以删除的任何内容以使其更简单。

不过,我目前的理解是,它是一个 Observable,它(不管它是如何构造的)应该发出一个值然后完成。然后,我们基于该 Observable 创建一个对象的两个相似实例,并在这些对象上调用一个方法来消耗 Observable,记下已完成的操作,然后返回 Observable.empty()。

谁能解释为什么使用这个 observable 会导致测试失败(当使用更简单的 observable 会导致测试通过时)?

也可以通过连续调用 calculate().toBlocking().subscribe() 而不是使用 zip 或让计算返回 Observable.just(null) 来使测试通过。这对我来说 some 有意义(如果 calc1 为空,zip 将不会订阅 calc2,因为在这种情况下 zip 永远不会产生任何东西),但不完全有意义(我不明白为什么 zip对于更简单的 badObservable 版本而言,其行为并非如此——无论输入如何,calculate() 方法仍返回空)。

【问题讨论】:

    标签: java reactive-programming rx-java


    【解决方案1】:

    如果您用某些东西压缩一个空源,操作员会检测到它不能再产生任何值并取消订阅它的所有源。涉及到 zip 和 merge 的混合,并且 merge 非常重视取消订阅:它根本不会发出值 3,因此 concatMap 也不会调用第二个源的映射函数。

    【讨论】:

    • 谢谢。仍然不确定我是否理解。所以你说 zip 订阅了第一个 calc observable(这意味着 concatMap 的 lambda 被执行),意识到它是空的,所以在它被订阅之前取消订阅 calc2?如果是这样,如果源 observable 是一个简单的 Observable.just(1),为什么 calc2 的 lambda 会被执行?当您说“有 zip 和 merge 混合使用”时,您能解释一下合并的位置吗?
    • mergeflatMap(v -&gt; v) 中。是的,您甚至可以在订阅之前取消订阅,因为Subscriber 也是Subscription。在调用Observable.subscribe() 之前在Subscriber 上调用unsubscribe() 会立即取消订阅。您的第二个 zip 归结为以下代码:zip(Observable.empty(), Observable.empty(), v -&gt; v).toBlocking().subscribe() 为空或 zip(just(null), just(null), v -&gt; v) 具有一个值。
    • 好的,那么如果是mergeunsubscribe()敏感,并且在flatMap中,那为什么badObservable = Observable.just(1).flatMap(Observable::just)测试通过了?它仍然有一个flatMap,所以仍然有一个merge,它仍然会从剩余的zip 中退订(看到Observable.empty()),那么有什么不同?
    • 这会发出单个值,而您的原始值不会。如果 zip 的第一个源发出一个值,则 zip 不会取消订阅其他源,因此第二个源有机会发出。
    • 这是真的吗?据我所知,badObservable 的两种形式(一种同时具有zipflatMap,另一种只有flatMap)发出一个值。另外,据我了解,ObservableCalculator.calculate() 方法(输入到第二个zip)将总是返回Object.empty(),无论badObservable 是什么。所以我希望第二个zip 总是取消订阅第二个来源,但某些形式的badObservable 似乎并不在意。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多