【问题标题】:Does subscribeOn for combineLatest do something?combineLatest 的 subscribeOn 有什么作用吗?
【发布时间】:2019-02-15 08:13:48
【问题描述】:

像这样在Observable.combineLatest()之后添加subscribeOn()有什么意义吗:

Observable.combineLatest(
  someObservable,
  theOtherObservable,
  (something, theOther) -> iAmFunction(something, theOther)))
  .subscribeOn(Schedulers.computation())
  ...

我所理解的iAmFunction() 将在任何调度程序上调用,即被合并的可观察的 emmit 最后一次。

那么最后那个subscribeOn()的目的是什么?

【问题讨论】:

    标签: rx-java


    【解决方案1】:

    subscribeOn 指定订阅副作用将发生的位置,并且不保证您在它使用的线程上获得项目。请改用observeOn

    Observable.combineLatest(
        someObservable.observeOn(Schedulers.single()),
        theOtherObservable.observeOn(Schedulers.single()),
        (something, theOther) -> iAmFunction(something, theOther))
    )
    

    订阅时combineLatest 的作用是订阅其源,因此您在computation() 调度程序的线程上订阅someObservabletheOtherObservable。但是请注意,如果someObservable 不交还控制权,theOtherObservable 将不会被订阅。将subscribeOn 指定在尽可能靠近源的位置总是更好:

    Observable.combineLatest(
        someObservable.subscribeOn(Schedulers.computation()),
        theOtherObservable.subscribeOn(Schedulers.computation()),
        (something, theOther) -> iAmFunction(something, theOther))
    )
    

    【讨论】:

      【解决方案2】:

      在这个问题中,subscribeOn 决定 which thread executes the iAmFunction() (Link)。基本上subscribeOn 决定发射线程。

      SubscribeOn 操作符指定 Observable 将开始操作的线程,无论该操作符在操作符链中的哪个点被调用。

      例如有两个 combineLatest 逻辑。

      fun main(args: Array<String>) {
      
          Observables.combineLatest(
              Observable.just("o1"),
              Observable.just("o2")
          ) { _, _ -> Thread.currentThread().name }
              .subscribe {
                  println("Without subscribeOn")
                  println("in combineLatest: $it")
                  println("in subscribe: ${Thread.currentThread().name}")
              }
      
          println()
      
          Observables.combineLatest(
              Observable.just("o1"),
              Observable.just("o2")
          ) { _, _ -> Thread.currentThread().name }
              .subscribeOn(Schedulers.io())
              .subscribe {
                  println("With subscribeOn")
                  println("in combineLatest: $it")
                  println("in subscribe: ${Thread.currentThread().name}")
      
              }
      
          Thread.sleep(500)
      }
      
      Without subscribeOn
      in combineLatest: main
      in subscribe: main
      
      With subscribeOn
      in combineLatest: RxCachedThreadScheduler-1
      in subscribe: RxComputationThreadPool-1
      

      如您所见,subscribeOn 更改了combineLatestsubscribe 的线程。如果你不使用observeOn,它可以从emitting itemssubscribe

      【讨论】:

      • 如何解释 CachedThreadScheduler / ComputationThreadPool?
      • 这是不正确的。 subscribeOn 指定订阅副作用将发生的位置,并且不保证您在它使用的线程上获得项目。
      猜你喜欢
      • 1970-01-01
      • 2017-12-12
      • 2017-04-09
      • 1970-01-01
      • 2016-01-13
      • 2011-10-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多