【问题标题】:Implementation of a "valve" for Observable streams, including buffering the last element emmitted before the valve reopened为 Observable 流实现“阀门”,包括缓冲阀门重新打开之前发出的最后一个元素
【发布时间】:2017-06-18 17:03:12
【问题描述】:

我正在努力思考如何在 RxJava (2.0) 中实现某些东西。它适用于 Android,我使用的是 Kotlin,尽管平台和语言的选择在这里并不重要。

我的想法是基于 RxJava 构建某种 MVP 架构。在这个实现中,我正在考虑一个 Activity(也可以是 Fragment 或自定义 View)公开一个值流(为简单起见,Booleans),它指示生命周期事件,或者视图是否被附加或分离。

基本思路是这样的:

private val lifecycleEvents = PublishSubject.create<Boolean>()
val screenStates: Observable<Boolean> = lifecycleEvents.hide()

override fun onResume() {
    super.onResume()
    lifecycleEvents.onNext(true) // I'm attached!
}

override fun onPause() {
    lifecycleEvents.onNext(false) // I'm detached!
    super.onPause()
}

override fun onDestroy() {
    lifecycleEvents.onComplete() // I'm gone        
    super.onDestroy()
}

然后从另一端,Presenter 公开了一个 Observable,它是一个表示屏幕状态的对象流 - 将由 View 呈现。

(这遵循本系列 http://hannesdorfmann.com/android/mosby3-mvi-1 中解释的概念 - 归结为 Presenter 向 View 提供包含完整屏幕状态的独立对象而不是 View 上的多个不同方法的事实)。

然后我想绑定这两个可观察的流,以便:

  • 每当 View 分离时,Presenter 的输入都会被忽略(并且不会缓冲,以免遇到任何背压问题)

  • 然而,一旦视图被重新连接,它就会获取 Presenter 发出的 最新 状态。也就是说,最多只能缓存一个状态实例。

它将按如下方式工作(为简单起见,假设状态为 String 类型):

val merged: Observable<String> = ???

val attached = true
val disattached = false        

screenStates.onNext(attached)
fromPresenter.onNext("state A")
fromPresenter.onNext("state B")

screenStates.onNext(disattached)
fromPresenter.onNext("state C") // this won't survive at the end
fromPresenter.onNext("state D") // this will "override" the previous one.
// as that's the last state from BEFORE the screen is reattached

screenStates.onNext(attached)
// "state D" should be replayed at this point, "state C" is skipped and lost

fromPresenter.onNext("state E")

// what "merged" is supposed to have received at this point:
// "state A", "state B", "state D", "state E"

我不确定最好的惯用解决方案是什么。

我尝试将其实现为ObservableTransformer,但我无法完全正确。我相信变压器应该是无状态的,而我的解决方案倾向于明确跟踪发出的内容并“手动”缓冲最后一个元素等,这感觉很混乱而且太紧迫了,所以我认为这是错误的。

我找到了https://github.com/akarnokd/RxJava2Extensions/blob/master/src/main/java/hu/akarnokd/rxjava2/operators/FlowableValve.java,但实现看起来非常复杂,我不敢相信它不能以更简单的方式完成(我不需要所有的灵活性,我只想要适用于所描述的东西用例)。

任何见解都将不胜感激,包括在 Android 的背景下是否还有其他我应该考虑的事情。另请注意,我不使用 RxKotlin 绑定(我可能会,我只是不认为这里应该需要它们)。

编辑:

以下是我当前的实现。正如我所说,我对此不太满意,因为它是明确的有状态的,我相信这应该以声明的方式实现,利用 RxJava 的一些结构。

我需要合并两个不同类型的流,因为combineLatestzip 都没有做到这一点,所以我使用了一个技巧,为两种不同类型的事件创建了一个通用包装器。它又引入了一定的开销。

sealed class Event
class StateEvent(val state: String): Event()
class LifecycleEvent(val attached: Boolean): Event()

class ValveTransformer(val valve: Observable<Boolean>) : ObservableTransformer<String, String> {
    var lastStateEvent: Event? = null
    var lastLifecycleEvent = LifecycleEvent(false)

    private fun buffer(event: StateEvent) {
        lastStateEvent = event
    }

    private fun buffer(event: LifecycleEvent) {
        lastLifecycleEvent = event
    }

    private fun popLastState(): String {
        val bufferedState = (lastStateEvent as StateEvent).state
        lastStateEvent = null
        return bufferedState
    }

    override fun apply(upstream: Observable<String>): ObservableSource<String> = Observable
            .merge(
                    upstream.map(::StateEvent).doOnNext { buffer(it) }, 
                    valve.distinctUntilChanged().map(::LifecycleEvent).doOnNext { buffer (it) })
            .switchMap { when {
                it is LifecycleEvent && it.attached && lastStateEvent != null ->
                    // the screen is attached now, pump the pending state out of the buffer
                    just(popLastState())
                it is StateEvent && lastLifecycleEvent.attached -> just(it.state)
                else -> empty<String>()
            } }
}

【问题讨论】:

    标签: android rx-java observable reactive-programming rx-kotlin


    【解决方案1】:

    将@TpoM6oH 的答案与原始提案相结合:

    val bufferedEvent: Observable<Event> = BehaviorSubject.create()
    bufferedEventResult = valve.switchMap( 
         viewEvent -> if (viewEvent) 
                           bufferedEvent 
                      else Observable.never() )
    

    switchMap() 运算符负责订阅和取消订阅。

    然后,您可以使用 publish() 将生成的 observable 拆分为必要的状态和事件。我不确定ObservableTransformer 的需求是什么。

    【讨论】:

    • 谢谢鲍勃!我现在不能尝试(工作太忙,这只是我的副业),但我会开始的。我赞成您的回答,并在测试解决方案后接受。
    • 老实说,我不确定如何将此代码放入我的ObservableTransformer... 为什么要同时缓冲两者?我只需要缓冲状态,而不是生命周期事件
    • 好吧,我会缓冲生命周期事件。仍然不确定它应该如何包含在 {{ObservableTransformer}} 中。例如。我如何以及在哪里订阅 {{bufferedState}} 到传入流(受转换的影响)?之后我将如何以及在哪里取消订阅?
    • 在我的示例中有两个单独的开关,因为我不明白您需要将它们组合在一起。我将编辑我的示例。
    • 谢谢。 Transformer 的想法和目的是解决 MVP/Android 中的生命周期问题。可以附加和重新附加视图(在 Android 上,实际上重新初始化了相同的屏幕,例如,当设备旋转时)并且演示者应该保留以“屏幕状态”提供视图,直到重新附加。它通常通过在 Presenter 本身上实现生命周期方法并在它发生时订阅/取消订阅 View 来完成。我希望 View 发出一个(比如说)布尔流,用作“阀门”,并在单个流级别上解决这个问题而无需取消订阅
    【解决方案2】:

    在我看来,您正在寻找 BehaviorSubject - 这是一个向每个订阅的观察者发出最近观察到的项目和所有后续观察到的项目的主题。

    如果您在演示者中使用它,请在分离视图时取消订阅,并在附加视图时订阅它,您应该会得到您想要的。

    【讨论】:

    • 感谢您的回复。我知道BehaviorSubject 以及以这种方式实现它的可能性。但我想知道它是否可以实现 取消订阅和重新订阅。我的意思是,我知道我们最终必须取消订阅,但我只想在视图被破坏时取消订阅 - 而不是每次暂停时。
    • 在不取消订阅的情况下完全做到这一点似乎很棘手,但您可以将该逻辑移至演示者,为您的所有视图事件设置一个行为主题,另一个主题将暴露给视图,取消订阅暴露的当您收到 detach 事件时从行为主题中获取主题,并在您收到 attached 请求时订阅。然后你可以把它移到你的presenter的基类中,它看起来会很不错。
    • 我猜这也是可能的。无论如何,我编辑了粘贴我当前实现的问题——它和我想出的一样好。显然,Kotlin 的表现力在这里派上了用场——等效 Java 实现的丑陋会更加明显。
    猜你喜欢
    • 2011-09-28
    • 1970-01-01
    • 1970-01-01
    • 2010-12-22
    • 1970-01-01
    • 1970-01-01
    • 2015-05-06
    • 1970-01-01
    • 2018-08-30
    相关资源
    最近更新 更多