【问题标题】:RxJava: why subscribing to shared observable changes emitted itemsRxJava:为什么订阅共享的可观察更改会发出项目
【发布时间】:2018-02-07 23:10:47
【问题描述】:

我偶然发现了一些我无法解释的令人费解的行为。我从一个更大的 rx 链中提炼出下面的例子,所以不要惊讶为什么我会这样做。我只是想了解为什么会这样! :)

enum class Request {
    Request1,
    Request2
}


fun main(args: Array<String>) {
    val requestStream = PublishSubject.create<Request>()

    val stateChanges = requestStream.share()

    stateChanges
        .delaySubscription(requestStream)
        .subscribe({ println("received $it") })

    // Comment this and it changes the output!
    stateChanges.subscribe()

    requestStream.onNext(Request.Request1)
    requestStream.onNext(Request.Request2)
}

所以。如果你运行上面的程序,它会打印:

received Request1
received Request2

但是如果你评论stateChanges.subscribe(),突然Request1 会丢失并且它只打印这个:

received Request2

你能解释一下吗? 另外我想知道即使没有额外的subscribe(),上述设置是否可以同时发出这两个项目。

【问题讨论】:

    标签: rx-java reactive-programming rx-java2


    【解决方案1】:

    在正常情况下,share 已经连接到requestStream 并且由于delaySubscription(requestStream)sharerequestStream 向其发送Request1 之前获得了第二个Observer。所以requestStream 有两个Observers,向第一个发送Request1 会向第二个消费者share 添加另一个Observer,因此,那里的最终订阅者会得到Request1。

    在注释掉的情况下,share 还没有连接到requestStream,因此requestStream 只能通知delaySubscriptiondelaySubscription 触发 share,然后订阅 requestStream。但是,PublishSubject 仅将项目发送到Observers 的当前快照,同时看不到它的第一个onNext 添加了一个新的Observer。因此,Request1 没有到达println

    PublishSubject 未处理此极端情况,因为它需要其onNext 记住哪个Observer 已收到当前项目并继续重试以防当前Observer 的集合发生更改。这会增加内存和时间开销。

    【讨论】:

    • 感谢您的详细解答!我需要一些安静的时间来处理它。
    猜你喜欢
    • 2018-04-29
    • 2017-11-14
    • 2018-07-28
    • 1970-01-01
    • 2015-06-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多