【发布时间】:2020-04-18 23:42:37
【问题描述】:
我正在尝试使用工作池构建通用管道库。我为源、管道和接收器创建了一个接口。您会看到,管道的工作是从输入通道接收数据,对其进行处理,然后将结果输出到通道上。这是它的预期行为:
- 从输入通道接收数据。
- 将数据委托给可用的工作人员。
- worker 将结果发送到输出通道。
- 所有工作人员完成后关闭输出通道。
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
return
}
func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
for j := range jobs {
func(j Job) {
defer wg.Done()
wg.Add(1)
res := doSomethingWith(j)
out <- res
}(j)
}
}
但是,运行它可能会在不处理所有输入的情况下退出,或者会出现send on closed channel 消息的恐慌。使用-race 标志构建源会在close(out) 和out <- res 之间发出数据竞争警告。
这就是我认为可能发生的事情。一旦许多工人完成了他们的工作,wg 的计数器会在一瞬间达到零。因此,wg.Wait() 完成,程序继续执行close(out)。同时,job 通道还没有完成数据生成,这意味着一些工作人员仍在另一个 goroutine 中运行。由于out 频道已经关闭,因此会导致恐慌。
是否应该将等待组放在其他地方?或者有没有更好的方法来等待所有工人完成?
【问题讨论】:
标签: go concurrency race-condition goroutine