【发布时间】:2018-05-21 11:00:32
【问题描述】:
我想在一些 goroutine 之间分配一些负载。如果事先知道任务的数量,那么组织起来很容易。例如,我可以与等待组一起扇出。
nTasks := 100
nGoroutines := 10
// it is important that this channel is not buffered
ch := make(chan *Task)
done := make(chan bool)
var w sync.WaitGroup
// Feed the channel until done
go func () {
for i:= 0; i < nTasks; i++ {
task := getTaskI(i)
ch <- task
}
// as ch is not buffered once everything is read we know we have delivered all of them
for i:=0; i < nGoroutines; i++ {
done <- false
}
}()
for i:= 0; i < nGoroutines; i ++ {
w.Add(1)
go func () {
defer w.Done()
select {
case task := <-ch:
doSomethingWithTask(task)
case <- done:
return
}
}()
}
w.Wait()
// All tasks done, all goroutines closed
但是,在我的情况下,每个任务都会返回更多要完成的任务。比如说一个爬虫,我们从爬取的网络中接收所有链接。我最初的预感是有一个主循环,我可以在其中跟踪已完成的任务数和待处理的任务数。完成后,我向所有 goroutine 发送完成信号:
nGoroutines := 10
ch := make(chan *Task, nGoroutines)
feedBackChannel := make(chan * Task, nGoroutines)
done := make(chan bool)
for i:= 0; i < nGoroutines; i ++ {
go func () {
select {
case task := <-ch:
task.NextTasks = doSomethingWithTask(task)
feedBackChannel <- task
case <- done:
return
}
}()
}
// seed first task
ch <- firstTask
nTasksRemaining := 1
for nTasksRemaining > 0 {
task := <- feedBackChannel
nTasksRemaining -= 1
for _, t := range(task.NextTasks) {
ch <- t
nTasksRemaining++
}
}
for i:=0; i < nGoroutines; i++ {
done <- false
}
但是,这会产生死锁。例如,如果 NextTasks 大于 goroutine 的数量,那么当第一个任务完成时主循环将停止。但是第一个任务无法完成,因为 mainLoop 正在等待写入,反馈被阻塞。
解决这个问题的一个“简单”方法是异步发布到频道:
而不是做feedBackChannel <- task 做go func () {feedBackChannel <- task}()。现在,这感觉就像一个可怕的黑客攻击。特别是因为可能有数十万个任务。
什么是避免这种僵局的好方法?我已经搜索了并发模式,但大多是更简单的东西,例如扇出或后期阶段不影响早期步骤的管道。
【问题讨论】:
-
您的描述有点过于复杂,无法完全理解,但我有 2 条注释。 1. 你在 goroutine 中错误地执行了 waitGroup.Add(),它应该在调用它之前完成。一旦 goroutine 启动,我通常会立即调用 defer waitGroup.Done()。 2. 不清楚为什么需要feedbackChannel。在我看来,您只需根据需要生成新的 grorutines,并在主线程中执行 watGroup.Wait()。但我可能缺少一些要求。
-
@AlexanderTrakhimenok 生成新的 goroutine 会起作用,但我想重用 goroutine 并限制它们的数量会消耗更少的资源,对吧?就我而言,我预计会有数十万个任务。 (顺便说一句,我已经修复了等待组)
-
Go 例程非常轻量,通常会生成数百万个 goroutine 而没有任何问题。虽然您可以查看速率限制模式gobyexample.com/rate-limiting
-
Goroutines 非常轻量级。我至少会在花哨之前测试您的程序需要多少资源。通常数十万个同时运行的 goroutine 都不是问题。
-
@GabrielFurstenheim goroutines 的开销非常低,我发现通过生成 goroutines 而不是尝试自己管理工作来管理类似的工作队列相对容易。另一方面,它大约是数千,而不是数十万,因此它可能不适用于您的情况。但是试一试,它可以工作,这会让你的代码变得简单得多。
标签: go concurrency