【发布时间】:2018-03-11 00:40:32
【问题描述】:
我需要在ReceiveChannel 上实现以下.combineLatest() 扩展功能
suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
otherSource: ReceiveChannel<B>,
context: CoroutineContext = Unconfined,
combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {
// ?
}
我希望它像 RxJava 的 combineLatest() 一样运行。
我该怎么做?
编辑:到目前为止,我有这个,但它不起作用。 sourceB.consumeEach{ } 块永远不会被执行。
suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
otherSource: ReceiveChannel<B>,
context: CoroutineContext = Unconfined,
combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {
val sourceA: ReceiveChannel<A> = this@combineLatest
val sourceB: ReceiveChannel<B> = otherSource
var latestA: A? = null
var latestB: B? = null
sourceA.consumeEach { a ->
latestA = a
if (latestA != null && latestB != null) {
send(combineFunction(latestA!!, latestB!!))
}
}
sourceB.consumeEach { b ->
latestB = b
if (latestA != null && latestB != null) {
send(combineFunction(latestA!!, latestB!!))
}
}
}
我还想确保当这个函数返回的ReceiveChannel<R>被关闭(取消订阅)时,我想确保父频道正确关闭。
【问题讨论】:
标签: kotlin reactive-programming kotlinx.coroutines