【发布时间】:2018-07-22 09:56:01
【问题描述】:
虽然answering a question 我试图实现一个设置,其中主线程加入CommonPool 的工作以并行执行许多独立任务(这就是java.util.streams 的操作方式)。
我创建了与CommonPool 线程一样多的actor,以及一个用于主线程的通道。演员使用会合频道:
val resultChannel = Channel<Double>(UNLIMITED)
val poolComputeChannels = (1..commonPool().parallelism).map {
actor<Task>(CommonPool) {
for (task in channel) {
task.execute().also { resultChannel.send(it) }
}
}
}
val mainComputeChannel = Channel<Task>()
val allComputeChannels = poolComputeChannels + mainComputeChannel
这允许我使用select 表达式来分配负载,为每个任务找到一个空闲的actor:
select {
allComputeChannels.forEach { chan ->
chan.onSend(task) {}
}
}
所以我发送所有任务并关闭频道:
launch(CommonPool) {
jobs.forEach { task ->
select {
allComputeChannels.forEach { chan ->
chan.onSend(task) {}
}
}
}
allComputeChannels.forEach { it.close() }
}
现在我必须为主线程编写代码。这里我决定同时服务mainComputeChannel,执行提交给主线程的任务,和resultChannel,将各个结果累加到最终的总和中:
return runBlocking {
var completedCount = 0
var sum = 0.0
while (completedCount < NUM_TASKS) {
select<Unit> {
mainComputeChannel.onReceive { task ->
task.execute().also { resultChannel.send(it) }
}
resultChannel.onReceive { result ->
sum += result
completedCount++
}
}
}
resultChannel.close()
sum
}
这导致mainComputeChannel 可能从CommonPool 线程关闭,但resultChannel 仍需要服务的情况。如果通道关闭,onReceive 将抛出异常,onReceiveOrNull 将立即选择 null。这两种选择都不可接受。如果mainComputeChannel 已关闭,我也找不到避免注册它的方法。如果我使用if (!mainComputeChannel.isClosedForReceive),注册调用不会是原子的。
这引出了我的问题:在某些频道可能会被另一个线程关闭而其他线程仍处于活动状态时,选择哪些频道是一个好的习惯用法?
【问题讨论】:
标签: multithreading kotlin kotlin-coroutines