【发布时间】:2015-04-13 22:37:54
【问题描述】:
我正在编写一个应用程序,用户可以从多个“工作”(实际上是 URL)开始。在开始时(主程序),我将这些 URL 添加到队列中,然后启动 x 个处理这些 URL 的 goroutine。
在特殊情况下,URL 指向的资源可能包含更多必须添加到队列中的 URL。 3 名工人正在等待新的工作进入并处理它们。问题是:一旦每个工人都在等待工作(并且没有人在生产任何工作),工人应该完全停止。所以要么全部工作,要么没有人工作。
我当前的实现看起来像这样,我不认为它很优雅。不幸的是,我想不出一个不包含竞争条件的更好方法,而且我不完全确定这个实现是否真的按预期工作:
var queue // from somewhere
const WORKER_COUNT = 3
var done chan struct{}
func work(working chan int) {
absent := make(chan struct{}, 1)
// if x>1 jobs in sequence are popped, send to "absent" channel only 1 struct.
// This implementation also assumes that the select statement will be evaluated "in-order" (channel 2 only if channel 1 yields nothing) - is this actually correct? EDIT: It is, according to the specs.
one := false
for {
select {
case u, ok := <-queue.Pop():
if !ok {
close(absent)
return
}
if !one {
// I have started working (delta + 1)
working <- 1
absent <- struct{}{}
one = true
}
// do work with u (which may lead to queue.Push(urls...))
case <-absent: // no jobs at the moment. consume absent => wait
one = false
working <- -1
}
}
}
func Start() {
working := make(chan int)
for i := 0; i < WORKER_COUNT; i++ {
go work(working)
}
// the amount of actually working workers...
sum := 0
for {
delta := <-working
sum += delta
if sum == 0 {
queue.Close() // close channel -> kill workers.
done <- struct{}{}
return
}
}
}
有没有更好的方法来解决这个问题?
【问题讨论】:
-
简短版:您已经明白了,但
sync.WaitGroup是处理您使用working频道进行的簿记的更简单方法。 -
好吧,我实际上认为在某处读到,在主线程等待后添加到等待组是一个坏主意。阅读文档后,我想我看错了......
-
罗杰。在答案中发布了
sync.WaitGroup示例(以及我必须这样做的代码的链接),供任何可能偶然发现此问题的人使用。 -
要求更具体:工人总数不得超过
WORKER_COUNT(选项是让WORKER_COUNT - 1工人工作+主例程。另外,假设每个工作都消失了(不剩下的工作),并且当前正在处理的最后一个工作会产生另外 100 个工作,您的解决方案会为此工作吗?意思是:WORKER_COUNT工人会处理即将到来的 100 个工作,还是会减少到 1 个工作为那份工作? -
是的,新版本会处理所有这些,请查看下面的内容——它恰好启动了四个工作人员,当一个工作人员无法将子任务卸载到另一个工作人员时,它会立即执行(即同步)。
标签: concurrency go synchronization worker