【问题标题】:How to safely select across channels where some may get concurrently closed?如何安全地选择一些可能同时关闭的渠道?
【发布时间】:2018-07-22 09:56:01
【问题描述】:

虽然answering a question 我试图实现一个设置,其中主线程加入CommonPool 的工作以并行执行许多独立任务(这就是java.util.streams 的操作方式)。

我创建了与CommonPool 线程一样多的actor,以及一个用于主线程的通道。演员使用会合频道:

val resultChannel = Channel<Double>(UNLIMITED)
val poolComputeChannels = (1..commonPool().parallelism).map {
    actor<Task>(CommonPool) {
        for (task in channel) {
            task.execute().also { resultChannel.send(it) }
        }
    }
}
val mainComputeChannel = Channel<Task>()
val allComputeChannels = poolComputeChannels + mainComputeChannel

这允许我使用select 表达式来分配负载,为每个任务找到一个空闲的actor:

select {
    allComputeChannels.forEach { chan ->
        chan.onSend(task) {}
    }
}

所以我发送所有任务并关闭频道:

launch(CommonPool) {
    jobs.forEach { task ->
        select {
            allComputeChannels.forEach { chan ->
                chan.onSend(task) {}
            }
        }
    }
    allComputeChannels.forEach { it.close() }
}

现在我必须为主线程编写代码。这里我决定同时服务mainComputeChannel,执行提交给主线程的任务,和resultChannel,将各个结果累加到最终的总和中:

return runBlocking {
    var completedCount = 0
    var sum = 0.0
    while (completedCount < NUM_TASKS) {
        select<Unit> {
            mainComputeChannel.onReceive { task ->
                task.execute().also { resultChannel.send(it) }
            }
            resultChannel.onReceive { result ->
                sum += result
                completedCount++
            }
        }
    }
    resultChannel.close()
    sum
}

这导致mainComputeChannel 可能从CommonPool 线程关闭,但resultChannel 仍需要服务的情况。如果通道关闭,onReceive 将抛出异常,onReceiveOrNull 将立即选择 null。这两种选择都不可接受。如果mainComputeChannel 已关闭,我也找不到避免注册它的方法。如果我使用if (!mainComputeChannel.isClosedForReceive),注册调用不会是原子的。

这引出了我的问题:在某些频道可能会被另一个线程关闭而其他线程仍处于活动状态时,选择哪些频道是一个好的习惯用法?

【问题讨论】:

    标签: multithreading kotlin kotlin-coroutines


    【解决方案1】:

    kotlinx.coroutines 库目前缺少一个原语以方便使用。出色的建议是为select 添加receiveOrClose 函数和onReceiveOrClosed 子句,这将使编写这样的代码成为可能。

    但是,您仍然需要手动跟踪您的 mainComputeChannel 已关闭的事实,并在关闭时停止选择它。因此,使用提议的onReceiveOrClosed 子句,您将编写如下内容:

    // outside of loop
    var mainComputeChannelClosed = false
    // inside loop
    select<Unit> {
        if (!mainComputeChannelClosed) {
            mainComputeChannel.onReceiveOrClosed { 
                if (it.isClosed) mainComputeChannelClosed = true 
                else { /* do something with it */ }
            }
        }
        // more clauses
    }    
    

    详情请见https://github.com/Kotlin/kotlinx.coroutines/issues/330

    目前尚无进一步简化这种模式的建议。

    【讨论】:

    • 我花了一些时间想这个,似乎onReceiveOrClosed是我认为的onReceiveOrNull的合同。所以onReceiveOrNull只会选择null立即,如果通道已经关闭,否则它会像onReceive一样抛出异常?事实证明,我已经考虑过您在此处提出的成语(但错误地使用了onReceiveOrNull),并认为它太复杂而无法承受它的重量。相反,我选择让通道保持打开状态并使用自定义完成信号。
    • 这个成语最糟糕的方面是你必须在select子句中为每个单独的注册重复它。
    • onReceiveOrNullonReceiveOrClosed 之间的区别在于异常关闭的通道。 onReceiveOrNull 会抛出异常,onReceiveOrClose 会返回相应的关闭原因。编写通用组合运算符(列表zipLatest 等)很重要,这些运算符应该从它们的来源之一正确传播错误。 即时性的术语没有区别。所有子句都以相同的方式对通道状态的变化做出反应,无论状态是在它们被调用时已经生效还是在稍后的某个时间生效。
    • 谢谢,基于此,我其实可以用这里的成语来实现我想要的,剩下的唯一问题是成语是否实用。
    猜你喜欢
    • 1970-01-01
    • 2021-05-07
    • 2011-02-02
    • 2020-05-02
    • 2021-09-21
    • 1970-01-01
    • 1970-01-01
    • 2012-05-01
    • 1970-01-01
    相关资源
    最近更新 更多