【问题标题】:call part of stream once with multiple subscribers?与多个订阅者一起调用一次流的一部分?
【发布时间】:2017-10-12 08:09:03
【问题描述】:

我有用于套接字通信的自定义 Rx 适配器。 在它之外,我用消息观察 Flowable。 然后我有一些管理器来处理每条消息,然后进一步发出它。

       fun observeSocket() = socketManager
                .observe()
                .doOnNext{
                    insideMessageHandler.handle(it)
                }

然后我有两个订阅者会观察Socket().subscribe()

问题在于每条消息 insideMessageHandler.handle(it) 都会被调用两次。我想找到流的一部分对于每个订阅者来说都是通用的方式。不幸的是,observeSocket() 末尾的 .share() 运算符不起作用。

我有两次这样的事情:

         /onNextInside
Flowable/-onNextOutsideSubscriber1
Flowable\-onNextOutsideSubscriber2
         \-onNextInside

我想要这样的东西:

         /-onNextInside
Flowable/-onNextOutsideSubscriber1
        \-onNextOutsideSubscriber2

在代码中看起来像

insideManager.observeSocket().subscribe({do something first})
insideManager.observeSocket().subscribe({do something second})

问题在于,在这种情况下,我调用了两次 onNextInside

有可能吗?

【问题讨论】:

    标签: android sockets kotlin rx-java rx-java2


    【解决方案1】:

    问题在于 observable 的重新创建:

    fun observeSocket() = socketManager
            .observe()
            .doOnNext{
                insideMessageHandler.handle(it)
            }
    

    每次调用observeSocket() 都会创建一个新链,因此将share() 放在那里不会有什么不同。

    将此链定义为共享单例:

    private val _observeSocket = socketManager
            .observe()
            .doOnNext{
                insideMessageHandler.handle(it)
            }
            .share()
    
    
    fun observeSocket() = _observeSocket
    

    【讨论】:

    • 喜欢答案 ;-)
    • 太棒了,它几乎可以完美运行!更改了操作员的命令,现在正是我想要的。谢谢!
    【解决方案2】:

    对于这种情况,您有主题。

    var yourSubject = PublishSubject<Type>.create()
    fun observeSocket() = socketManager.doOnNext(yourSubject::onNext)
    

    然后你可以在你的 observeSocket() 和另一个/多个订阅者上观察这个主题。

    如果你想共享一个 Observables 的发射,你可以使用 share() 操作符,它确保即使订阅了多个订阅者,它也只创建一个实例并使用共享发射的数据

    fun observeSocket() = socketManager
                .doOnNext(insideMessageHandler::handle)
                .share()
    

    无论如何,为了这个目的,将反应性的东西扁平化/发射到 doOnNext 中的另一个可观察对象并不是一个好的用例。

    最好共享一个 Flowable 并多次订阅并将值平面映射到您的需要,而不是将所有内容都放在一个 flowable 中,因为如果流中出现错误,整个流可能不会再发出任何数据。

    【讨论】:

    • doOnNext 不发出任何东西,它只是对对象做某事,例如记录它我将修复我的绘图:)
    • 好的,等一下。几分钟后检查。您能否提供示例数据,因为它并不清楚您的要求是什么
    猜你喜欢
    • 2016-05-03
    • 1970-01-01
    • 2019-02-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-03-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多