【问题标题】:Kotlin Flow: receiving values until emitting timeoutKotlin Flow:接收值直到发出超时
【发布时间】:2022-01-02 05:18:12
【问题描述】:

我想从 Flow 中收集特定数量的值,直到发生值发出超时。不幸的是,没有这样的运算符,所以我尝试使用debounce 运算符实现我自己的。

第一个问题是生产者太快了,有些包被跳过了,根本没有收集(它们在原始packages流的onEach中,但不在merge的第二流的onEachwithNullOnTimeout)。

第二个问题-根据amount参数取最后一个值后,原始流关闭,但超时流仍然存在,最后在最后一个值后产生超时。

如何解决这两个问题?

我的实现:


suspend fun receive(packages: Flow<ByteArray>, amount: Int): ByteArray {
        val buffer = ByteArrayOutputStream(blockSize.toInt())
        packages
            .take(10)
            .takeUntilTimeout(100) // <-- custom timeout operator
            .collect { pck ->
                buffer.write(pck.data)
            }
    return buffer.toByteArray()
}

fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> {
    require(durationMillis > 0) { "Duration should be greater than 0" }
    return withNullOnTimeout(durationMillis)
        .takeWhile { it != null }
        .mapNotNull { it }
}

fun <T> Flow<T>.withNullOnTimeout(durationMillis: Long): Flow<T?> {
    require(durationMillis > 0) { "Duration should be greater than 0" }
    return merge(
        this,
        map<T, T?> { null }
            .onStart { emit(null) }
            .debounce(durationMillis)
    )
}

【问题讨论】:

  • takeUntilTimeout 应该如何表现?计时器应该什么时候准确启动?

标签: kotlin kotlin-coroutines


【解决方案1】:

这对我来说最初似乎很明显,但正如 Joffrey 在 cmets 中指出的那样,它可能会在收集终止之前造成不必要的延迟。我将把它作为一个次优、过于简单的解决方案的示例。

fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> = flow {
    val endTime = System.currentTimeMillis() + durationMillis
    takeWhile { System.currentTimeMillis() >= endTime }
        .collect { emit(it) }
}

这是我没有测试的另一个想法。

@Suppress("UNCHECKED_CAST")
fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> {
    val signal = Any()
    return merge(flow { delay(durationMillis); emit(signal) })
        .takeWhile { it !== signal } as Flow<T>
}

【讨论】:

  • 我认为这样做的一个缺点是您需要从源流中获取一个元素才能检测超时。因此,如果原始流每 10 秒发出一次,而您的超时时间为 1 秒,则您需要再等待 9 秒才能停止流
  • 我相信更重要的是计时器启动是错误的,因为流程应该是冷的。在这里,操作员将立即计算当前时间。这当然取决于 OP 的要求,但我认为它作为一般的流操作员是不合适的
  • 哦,对了。我从flow 构建器开始,并过度简化到我放在这里的内容。我修好了。
【解决方案2】:

怎么样:

fun <T> Flow<T>.takeUntilTimeout(timeoutMillis: Long) = channelFlow {
    val collector = launch {
        collect {
            send(it)
        }
        close()
    }
    delay(timeoutMillis)
    collector.cancel()
    close()
}

使用channelFlow 允许您生成第二个协程,这样您就可以独立计算时间,而且非常简单。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-09-30
    • 1970-01-01
    • 2023-01-31
    • 2021-10-28
    • 1970-01-01
    • 2022-09-27
    • 1970-01-01
    • 2013-04-09
    相关资源
    最近更新 更多