【问题标题】:How to close a channel with multiple senders?如何关闭具有多个发件人的频道?
【发布时间】:2019-11-14 05:51:10
【问题描述】:

我有一个案例,我需要扇出发送到同一频道的接收器:

func MessagesFromSQS(ctx context.Context, sqsClient sqsiface.SQSAPI) chan *sqs.Message {
    messages := make(chan *sqs.Message)

    go func() {
        defer close(messages)
        wg := sync.WaitGroup{}

        for i := 0; i < parallelSQSReaders; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()

                for {
                    select {
                    case <-ctx.Done():
                        return

                    default:
                        // ...

                        for _, message := range result.Messages {
                            messages <- message
                        }
                    }
                }
            }()
        }

        wg.Wait()
    }()

    return messages
}

对我来说这是有道理的。但是,竞争检测器抱怨不同的 goroutine 以及发送和关闭通道。我意识到负责发送的goroutine应该是同一个关闭的,但是正确的方法是什么?

编辑/解决:感谢您的回复。事实证明我没有正确读取比赛检测器堆栈跟踪。我假设我更改的代码引入了错误,而不是在 SQS 模拟中发现错误。一旦我正确同步ReceiveMessage() 就可以了。

【问题讨论】:

  • 是否有其他代码发送到messages?种族检测器报告的代码是否都在MessagesFromSQS中定义的匿名函数中?
  • 可能情况不太一样,但请参阅我对stackoverflow.com/q/58793428/1256452 的回答。您在此处显示的代码看起来不错。

标签: go channels


【解决方案1】:

当你知道不再有任何写入时关闭通道,即当所有工作程序 go-routines 都完成时。

所以:

wg.Wait()
close(messages)

附:我会重组你对上下文取消的轮询,将它与你的频道写在一个选择中结合起来,例如

for _, message := range result.Messages {

    select {
        case messages <- message:
        case <-ctx.Done():
            return
    }

}

【讨论】:

  • 这是一种不太安全的方法,或者我已经得到了。问题是 close() 发生在与发送不同的 goroutine 中。
  • 关闭其他 go-routine(s) 编写者的通道并不少见。 waitgroup 与 wait() 相结合,确保没有人可以写入通道。这是完全安全的协调。
  • 另外,永远不应该在通道上发生数据竞争——它们是为并发使用而设计的。 (对此有一些警告——但只要通道在使用前被初始化——你应该是安全的)。我怀疑(正如@CeriseLimon 评论的那样)数据竞赛是result.Messages。如果这是一个切片,并且在工作人员正在读取它时被写入 - 那么这是一个主要问题。您可能想考虑改用输入通道 - 工作人员从中读取。那么任何数据竞争都应该消失。
  • 谢谢。事实证明我没有正确读取比赛检测器堆栈跟踪。我假设我更改的代码引入了错误,而不是在 SQS 模拟中发现错误。一旦我正确同步了返回消息的方法就可以了。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-12-05
  • 2011-10-07
  • 2010-12-15
  • 2013-03-20
  • 1970-01-01
  • 2017-04-13
相关资源
最近更新 更多