【问题标题】:How can I implement RxJava's combineLatest on Kotlin co-routines receive channel?如何在 Kotlin 协程接收通道上实现 RxJava 的 combineLatest?
【发布时间】:2018-03-11 00:40:32
【问题描述】:

我需要在ReceiveChannel 上实现以下.combineLatest() 扩展功能

suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
    otherSource: ReceiveChannel<B>,
    context: CoroutineContext = Unconfined,
    combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {
    // ?
}

我希望它像 RxJava 的 combineLatest() 一样运行。

我该怎么做?

编辑:到目前为止,我有这个,但它不起作用。 sourceB.consumeEach{ } 块永远不会被执行。

suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
    otherSource: ReceiveChannel<B>,
    context: CoroutineContext = Unconfined,
    combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {

    val sourceA: ReceiveChannel<A> = this@combineLatest
    val sourceB: ReceiveChannel<B> = otherSource

    var latestA: A? = null
    var latestB: B? = null

    sourceA.consumeEach { a ->
        latestA = a
        if (latestA != null && latestB != null) {
            send(combineFunction(latestA!!, latestB!!))
        }
    }

    sourceB.consumeEach { b ->
        latestB = b
        if (latestA != null && latestB != null) {
            send(combineFunction(latestA!!, latestB!!))
        }
    }
}

我还想确保当这个函数返回的ReceiveChannel&lt;R&gt;被关闭(取消订阅)时,我想确保父频道正确关闭。

【问题讨论】:

    标签: kotlin reactive-programming kotlinx.coroutines


    【解决方案1】:

    这成功了!我仍然很困惑为什么我可以将一个 .consumeEach{ } 嵌套在另一个 .consumeEach { } 中 - 这似乎不直观。

    suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
        otherSource: ReceiveChannel<B>,
        context: CoroutineContext = Unconfined,
        combineFunction: suspend (A, B) -> R
    ): ReceiveChannel<R> = produce(context) {
    
        val sourceA: ReceiveChannel<A> = this@combineLatest
        val sourceB: ReceiveChannel<B> = otherSource
    
        val latestA = AtomicReference<A>()
        val latestB = AtomicReference<B>()
    
        var aInitialized = false
        var bInitialized = false
    
        sourceA.consumeEach { a ->
            latestA.set(a)
            aInitialized = true
            if (aInitialized && bInitialized) {
                send(combineFunction(latestA.get(), latestB.get()))
            }
    
            launch(coroutineContext) {
                sourceB.consumeEach { b ->
                    latestB.set(b)
                    bInitialized = true
                    if (aInitialized && bInitialized) {
                        send(combineFunction(latestA.get(), latestB.get()))
                    }
                }
            }
        }
    }
    

    【讨论】:

    • 你对sourceA元素的每一次消费都启动一个新的corountine,这些肯定会开始堆积起来互相竞争吗?我会使用 select 在同一个循环中收听两个频道
    【解决方案2】:

    我知道这是一个老问题,但这里有一个建议:

    我建议使用.zip() 而不是嵌套.consumeEach。查看文档here

    可能的解决方案sourceA.zip(sourceB).consumeEach{} 生成 Pair 类型的项目。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-11-28
      • 2020-04-12
      • 2021-06-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-07-19
      • 1970-01-01
      相关资源
      最近更新 更多