【发布时间】:2022-01-02 05:18:12
【问题描述】:
我想从 Flow 中收集特定数量的值,直到发生值发出超时。不幸的是,没有这样的运算符,所以我尝试使用debounce 运算符实现我自己的。
第一个问题是生产者太快了,有些包被跳过了,根本没有收集(它们在原始packages流的onEach中,但不在merge的第二流的onEach中withNullOnTimeout)。
第二个问题-根据amount参数取最后一个值后,原始流关闭,但超时流仍然存在,最后在最后一个值后产生超时。
如何解决这两个问题?
我的实现:
suspend fun receive(packages: Flow<ByteArray>, amount: Int): ByteArray {
val buffer = ByteArrayOutputStream(blockSize.toInt())
packages
.take(10)
.takeUntilTimeout(100) // <-- custom timeout operator
.collect { pck ->
buffer.write(pck.data)
}
return buffer.toByteArray()
}
fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> {
require(durationMillis > 0) { "Duration should be greater than 0" }
return withNullOnTimeout(durationMillis)
.takeWhile { it != null }
.mapNotNull { it }
}
fun <T> Flow<T>.withNullOnTimeout(durationMillis: Long): Flow<T?> {
require(durationMillis > 0) { "Duration should be greater than 0" }
return merge(
this,
map<T, T?> { null }
.onStart { emit(null) }
.debounce(durationMillis)
)
}
【问题讨论】:
-
takeUntilTimeout应该如何表现?计时器应该什么时候准确启动?