【问题标题】:Fan-out / fan-in - closing result channel扇出/扇入——关闭结果通道
【发布时间】:2018-01-06 17:18:30
【问题描述】:

我正在生产项目,从多个协同程序中消费并推回 resultChannel。生产者在最后一项之后关闭其频道。

代码永远不会结束,因为 resultChannel 永远不会被关闭。如何检测并正确完成迭代使hasNext()返回false

val inputData = (0..99).map { "Input$it" }
val threads = 10

val bundleProducer = produce<String>(CommonPool, threads) {
    inputData.forEach { item ->
        send(item)
        println("Producing: $item")
    }

    println("Producing finished")
    close()
}

val resultChannel = Channel<String>(threads)

repeat(threads) {
    launch(CommonPool) {
        bundleProducer.consumeEach {
            println("CONSUMING $it")
            resultChannel.send("Result ($it)")
        }
    }
}

val iterator = object : Iterator<String> {
    val iterator = resultChannel.iterator()
    override fun hasNext() = runBlocking { iterator.hasNext() }
    override fun next() = runBlocking { iterator.next() }
}.asSequence()

println("Starting interation...")

val result = iterator.toList()

println("finish: ${result.size}")

【问题讨论】:

  • 我发现的骇人听闻的方法是对结果序列执行 .take(100) ,但我不确定它离开底层结构的状态是什么。

标签: kotlin channel coroutine kotlinx.coroutines


【解决方案1】:

您可以运行一个协程等待消费者完成,然后关闭resultChannel

首先,重写启动消费者的代码以保存Jobs:

val jobs = (1..threads).map {
    launch(CommonPool) {
        bundleProducer.consumeEach {
            println("CONSUMING $it")
            resultChannel.send("Result ($it)")
        }
    }
}

然后运行另一个协程,一旦所有Jobs 完成后关闭通道:

launch(CommonPool) {
    jobs.forEach { it.join() }
    resultChannel.close()
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-08-25
    • 1970-01-01
    • 2018-12-16
    • 1970-01-01
    • 2019-01-02
    • 2014-06-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多