【问题标题】:Moving Window With Kotlin Flow使用 Kotlin Flow 移动窗口
【发布时间】:2022-01-16 11:49:41
【问题描述】:
我正在尝试使用 Kotlin Flows 创建一个移动的数据窗口。
在 RxKotlin 中可以使用缓冲区来实现,但是使用 Flows 时缓冲区不一样。
RxKotlin 有一个 buffer 运算符,定期将 Observable 发出的项目收集到包中并发出这些包,而不是一次发出一个项目 - buffer(count, skip)
Kotlin Flow 有一个 buffer,但它只是在一个单独的协程中运行一个收集器 - buffer
Flows 中是否存在可以实现此目的的现有运算符?
【问题讨论】:
标签:
kotlin
kotlin-coroutines
rx-kotlin
kotlinx.coroutines.flow
【解决方案1】:
我认为 Kotlinx Coroutines 库中没有您要查找的内容,但有 an open issue。
this comment 中也有一个可能的实现,我也将包括在这里:
fun <T> Flow<T>.windowed(size: Int, step: Int): Flow<List<T>> = flow {
// check that size and step are > 0
val queue = ArrayDeque<T>(size)
val toSkip = max(step - size, 0) < if sbd would like to skip some elements before getting another window, by serving step greater than size, then why not?
val toRemove = min(step, size)
var skipped = 0
collect { element ->
if(queue.size < size && skipped == toSkip) {
queue.add(element)
}
else if (queue.size < size && skipped < toSkip) {
skipped++
}
if(queue.size == size) {
emit(queue.toList())
repeat(toRemove) { queue.remove() }
skipped = 0
}
}
}