【问题标题】:Observable.zip chooses different schedulers to subscribe on different callsObservable.zip 选择不同的调度程序来订阅不同的调用
【发布时间】:2017-03-27 14:44:47
【问题描述】:

我有两个 observables o1 和 o2。我将它们压缩到 Observable.zip() 函数上,但订阅每次都在不同的调度程序上。我希望所有 o1、o2 和 z observable 都应该订阅 Schedulers.io() observable。但每次都是随机的 Schedulers.io() 或 Schedulers.computation()。

这是我重现问题的源代码

import rx.Observable;
import rx.schedulers.Schedulers;

public class RxZipSchedulers {

    public static void main(String[] args) {

        for(int i=0;i<100;i++) {
            Observable<String> o1 = Observable.just("o1").subscribeOn(Schedulers.computation());
            Observable<String> o2 = Observable.just("o2");

            Observable z = Observable.zip(o1, o2, (s1, s2) -> s1 + " " + s2 + " " + Thread.currentThread());

            z.subscribeOn(Schedulers.io())
                    .subscribe(res -> {
                        System.out.println(res);
                    });

            z.toCompletable().await();
        }
    }

}

在我的机器上,输出是这样的(每次注意 RxComputationScheduler 或 RxIoScheduler):

o1 o2 线程[RxComputationScheduler-1,5,main]

o1 o2 线程[RxComputationScheduler-4,5,main]

o1 o2 线程[RxComputationScheduler-1,5,main]

o1 o2 线程[RxComputationScheduler-3,5,main]

o1 o2 线程[RxComputationScheduler-4,5,main]

o1 o2 线程[RxComputationScheduler-3,5,main]

o1 o2 线程[RxIoScheduler-3,5,main]

o1 o2 线程[RxIoScheduler-2,5,main]

o1 o2 线程[RxComputationScheduler-1,5,main]

o1 o2 线程[RxComputationScheduler-3,5,main]

o1 o2 线程[RxIoScheduler-3,5,main]

o1 o2 线程[RxIoScheduler-2,5,main]

为什么所有的 o1, o2, z observables 都不订阅 Schedulers.io() ?我认为这个调度器应该在整个 observables 链上传播,但这只是偶尔发生。

【问题讨论】:

    标签: java rx-java reactivex


    【解决方案1】:

    zip() 运算符的情况是,zip 函数将在最后一个发射 Observable 的线程(根据 Scheduler)上调用,即在这种情况下,随机 o1o2 调度程序是 computation()io() 和 .

    为什么每个调度程序都是computation()io()
    虽然您在压缩包Observable 中指定了调度程序,但它只影响zip() 的订阅操作,不一定影响每个压缩包Observable 的作用。

    在RxJava中每个Observable都可以指定自己的Scheduler,当zip()会订阅压缩后的Observable,每个Scheduler都会对配置好的Scheduler进行操作:
    o1 的情况下 - computation 因为它是使用 subscribeOn() 显式配置的。
    但是,如果没有为 Observable 指定 Scheduler,那么 Observable 将作用于Scheduler 订阅已完成 - 在您的情况下,这就是 o2 发生的情况,因为它的订阅在调用 zip 订阅的地方被调用 - io()

    如果您出于某种原因关心 zip() 函数发生的位置,您可以简单地继续使用 2 个压缩对象,并使用 observerOn() 更改操作位置并使用 map() 执行自定义压缩逻辑。

    【讨论】:

      猜你喜欢
      • 2013-12-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-03-04
      • 2019-07-24
      • 2015-06-25
      • 1970-01-01
      • 2018-02-19
      相关资源
      最近更新 更多