【问题标题】:Concurrent queue which returns channels, locking doubts返回通道的并发队列,锁定疑惑
【发布时间】:2018-11-25 04:15:04
【问题描述】:

有一个不重要结构Message的队列,有经典的push和pop方法:

type Queue struct {
    messages list.List
}

//The implementation is not relevant for the sake of the question
func (q *Queue) Push(msg Message) { /*...*/ }
func (q *Queue) Pop() (Message, bool) { /*...*/ }

/*
 * NewTimedChannel runs a goroutine which pops a message from the queue every 
 * given time duration and sends it over the returned channel 
 */
func (q *Queue) NewTimedChannel(t time.Duration) (<-chan Message) {/*...*/}

Push 函数的客户端将是一个 web gui,用户将在其中发布他们的消息。
NewTimedChannel 返回的通道的客户端将是一个服务,它通过网络将每条消息发送到不相关的端点。

我是并发新手,我有以下问题:

我知道,由于Queue.messages 是在用户提交 Web 表单后处理推送消息的主 goroutine 和为每个 NewTimedChannel 调用创建的 goroutine 之间的共享状态,我需要锁定它。

我是否需要在所有 Push、Pop 和 NewTimedChannel 方法中使用 sync.Mutex 锁定和解锁?
还有更惯用的方法来处理这个特定问题在 go 环境中?

【问题讨论】:

  • 1.是的。可以看出,如果在比赛检测器下运行(你应该习惯这样做)。 2.没有。

标签: go concurrency locking


【解决方案1】:

是否需要在所有 Push、Pop 和 NewTimedChannel 方法中使用 sync.Mutex 进行锁定和解锁?

是的。

还有没有更惯用的方法来处理这个特定问题 go 环境?

为了深入了解,请查看此问题的最后一个答案:

How do I (succinctly) remove the first element from a slice in Go?

【讨论】:

    【解决方案2】:

    正如其他人指出的那样,它需要同步,否则会出现数据竞争。

    Go 中有句谚语,“不要通过共享内存进行通信,通过通信来共享内存。”在这种情况下,我认为一种惯用的方法是让通道发送到一个单独的 goroutine,该 goroutine 使用select 将所有操作同步在一起。可以通过添加更多通道来支持更多类型的操作(例如您的代码中的定时通道,我不完全理解它是做什么的),并且通过使用select 和其他实用程序,可以轻松扩展代码通过使用锁来处理更复杂的同步。我写了一些示例代码:

    type SyncQueue struct {
        Q AbsQueue
        pushCh,popMsgCh chan Message
        popOkCh chan bool
        popCh chan struct{}
    }
    
    // An abstract of the Queue type. You can remove the abstract layer.
    type AbsQueue interface {
        Push(Message)
        Pop() (Message,bool)
    } 
    
    func (sq SyncQueue) Push(m Message) {
        sq.pushCh <- m
    }
    
    func (sq SyncQueue) Pop() (Message,bool) {
        sq.popCh <- struct{}{} // send a signal for pop. struct{}{} cost no memory at all.
    
        return <-sq.popMsgCh,<-sq.popOkCh
    }
    
    // Every pop and push get synchronized here.
    func (sq SyncQueue) Run() {
        for {
            select {
            case m:=<-pushCh:
                Q.Push(m)
            case <-popCh:
                m,ok := Q.Pop()
                sq.popMsgCh <- m
                sq.popOkCh <- ok
            }   
        }
    }
    
    func NewSyncQueue(Q AbsQueue) *SyncQueue {
        sq:=SyncQueue {
            Q:Q,
            pushCh: make(chan Message),popMsgCh: make(chan Message),
            pushOkCh: make(chan bool), popCh: make(chan struct{}),
        }
        go sq.Run()
        return &sq 
    }
    

    注意,为了简单起见,我没有使用退出通道或context.Context,所以sq.Run()的goroutine无法退出,会导致内存泄漏。

    【讨论】:

      猜你喜欢
      • 2017-03-28
      • 1970-01-01
      • 2021-08-20
      • 1970-01-01
      • 1970-01-01
      • 2020-03-02
      • 2013-04-16
      • 2021-08-08
      • 2017-01-02
      相关资源
      最近更新 更多