【问题标题】:Unexpected behavior with RxJava2 PublishSubjectRxJava2 PublishSubject 的意外行为
【发布时间】:2017-11-21 22:01:14
【问题描述】:

我有以下代码使用PublishSubject

val subject = PublishSubject.create<Int>()

val o1: Observable<String> =
        subject.observeOn(Schedulers.newThread()).map { i: Int ->
            println("${Thread.currentThread()} | ${Date()} | map => $i")
            i.toString()
        }

o1.subscribe {
    println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it")
}

o1.subscribe {
    println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it")
}

o1.subscribe {
    println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it")
}

println("${Thread.currentThread()} | ${Date()} | submitting 1")

subject.onNext(1)

1) 我从它创建一个Observable 并对其进行映射(出于本示例的目的,我只是转换为String)=> o1

2) 然后我订阅o1 3 次。

3) 最后我通过调用subject.onNext(1)“发布”一个事件。

令我惊讶的是,我得到了以下输出:

Thread[main,5,main] | Mon Jun 19 09:46:37 PDT 2017 | submitting 1
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (1) => 1
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (2) => 1
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (3) => 1

map 最终被调用了 3 次,我不明白为什么,因为我订阅了 o1,这应该发生在 map 发生之后。我肯定错过了什么。任何帮助将不胜感激。

谢谢 颜

【问题讨论】:

  • 您订阅o1 3 次,每次都创建一个独立的序列,直到PublishSubjectonNext 发送到所有3 个链。
  • 你说“直到PublishSubject”:为什么一直到主题?你能指出我在哪里可以解释吗?如果这是正常行为,是否有办法在映射后转换流,使其不这样做?
  • 因为从所有 3 个订阅者的角度来看,PublishSubject 是一个多播源,它通过 subscribe() 调用建立的独立链向他们发送事件信号。
  • 在主题上应用运算符通常不会使整个链变热,因为这些运算符元素仅在订阅时才附加到源主题。因此,多个订阅将产生多个频道到同一个上游主题。
  • 我想这是有道理的。那么如何避免这种情况呢?如果我有subject.map(...).filter(...).map(...).... 一系列复杂且计算量大的转换怎么办。让每个订阅重做相同的计算会非常昂贵。你会怎么做?

标签: rx-java2


【解决方案1】:

来自 cmets:

您订阅o1 三次,每次都创建一个独立的序列,直到PublishSubjectonNext 发送到所有3 个链。

从所有 3 个订阅者的角度来看,PublishSubject 是一个多播源,通过subscribe() 调用建立的独立链向他们发送事件信号。

Subject 上应用运算符通常不会使整个链变热,因为这些运算符元素仅在订阅时才附加到源Subject。因此,多个订阅将产生多个通道到同一上游Subject

使用publish 得到一个ConnectableObservable(或在最后的另一个PublishSubject)以使序列从那时开始变得热门。

【讨论】:

  • 我确认在map 之后添加.publish.autoConnect() 是正确的。我不是 100% 确定您所说的 或另一个 PublishSubject 是什么意思,但我使其工作的方式是添加 .subscribe { otherSubject.onNext(it) } 并让订阅发生在其他主题上。
  • 为了结束循环,我创建了一个要点来演示 gist.github.com/ypujante/2ab3c3a135272ea4bc4554cbfc287ca7 的 2 个选项。最后我选择了第二个主题的选项,因为它对我来说似乎更干净。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-12-28
  • 1970-01-01
相关资源
最近更新 更多