【发布时间】:2019-11-14 05:51:10
【问题描述】:
我有一个案例,我需要扇出发送到同一频道的接收器:
func MessagesFromSQS(ctx context.Context, sqsClient sqsiface.SQSAPI) chan *sqs.Message {
messages := make(chan *sqs.Message)
go func() {
defer close(messages)
wg := sync.WaitGroup{}
for i := 0; i < parallelSQSReaders; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
// ...
for _, message := range result.Messages {
messages <- message
}
}
}
}()
}
wg.Wait()
}()
return messages
}
对我来说这是有道理的。但是,竞争检测器抱怨不同的 goroutine 以及发送和关闭通道。我意识到负责发送的goroutine应该是同一个关闭的,但是正确的方法是什么?
编辑/解决:感谢您的回复。事实证明我没有正确读取比赛检测器堆栈跟踪。我假设我更改的代码引入了错误,而不是在 SQS 模拟中发现错误。一旦我正确同步ReceiveMessage() 就可以了。
【问题讨论】:
-
是否有其他代码发送到
messages?种族检测器报告的代码是否都在MessagesFromSQS中定义的匿名函数中? -
可能情况不太一样,但请参阅我对stackoverflow.com/q/58793428/1256452 的回答。您在此处显示的代码看起来不错。