【发布时间】:2021-09-30 12:12:33
【问题描述】:
我正在学习 kotlin 协程和流程,但有一件事对我来说有点晦涩难懂。如果我有一个长时间运行的常规协程循环,我可以使用 isActive 或 ensureActive 来处理取消。然而,这些不是为流程定义的,但以下代码正确地完成了流程:
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
private val logger = LoggerFactory.getLogger("Main")
fun main() {
val producer = FlowProducer()
runBlocking {
producer
.produce()
.take(10)
.collect {
logger.info("Received $it")
}
}
logger.info("done")
}
class FlowProducer {
fun produce() = flow {
try {
var counter = 1
while (true) {
logger.info("Before emit")
emit(counter++)
logger.info("After emit")
}
}finally {
logger.info("Producer has finished")
}
}.flowOn(Dispatchers.IO)
}
为什么会这样?是因为发射是一个可以为我处理取消的可挂起函数吗?如果有条件地调用了发射怎么办?例如,该循环实际上从 Kafka 轮询记录,并且仅当接收到的记录不为空时才调用发出。那么我们可以有这样的情况:
- 我们想要 10 条消息(取 10 条)
- 其实kafka主题只有10条消息
- 由于没有更多消息,因此不会再次调用 emit,因此即使我们收到了所有想要的消息,循环也会继续在不必要的轮询上浪费资源。
不确定我的理解是否正确。在这种情况下,我应该在每个循环上调用 yield() 吗?
【问题讨论】:
标签: kotlin kotlin-coroutines kotlin-flow