【问题标题】:Is there some elegant way to pause and resume any other goroutine?是否有一些优雅的方式来暂停和恢复任何其他 goroutine?
【发布时间】:2013-04-12 15:46:34
【问题描述】:

就我而言,我有数千个 goroutine 同时作为 work() 工作。我还有一个sync() goroutine。当sync 启动时,我需要任何其他 goroutine 在同步作业完成后暂停一段时间。这是我的代码:

var channels []chan int
var channels_mutex sync.Mutex

func work() {
  channel := make(chan int, 1)
  channels_mutex.Lock()  
  channels = append(channels, channel)
  channels_mutex.Unlock()
  for {
    for {
      sync_stat := <- channel // blocked here
      if sync_stat == 0 { // if sync complete
        break  
      }
    }
    // Do some jobs
    if (some condition) {
      return
    }
  }
}

func sync() {
  channels_mutex.Lock()
  // do some sync

  for int i := 0; i != len(channels); i++ {
    channels[i] <- 0
  }
  channels_mutex.Unlock()
}

现在的问题是,由于&lt;- 总是在读取时阻塞,所以每次去sync_stat := &lt;- channel 都是阻塞的。我知道如果频道关闭了它不会被阻止,但是由于我必须在work()退出之前使用这个频道,并且我没有找到任何方法来重新打开已关闭的频道。

我怀疑自己走错路了,因此感谢您的帮助。是否有一些“优雅”的方式来暂停和恢复任何其他 goroutine?

【问题讨论】:

    标签: go channel goroutine


    【解决方案1】:

    如果我理解正确,您需要 N 个工人和一个控制器,可以随意暂停、恢复和停止工人。下面的代码就可以做到这一点。

    package main
    
    import (
        "fmt"
        "runtime"
        "sync"
    )
    
    // Possible worker states.
    const (
        Stopped = 0
        Paused  = 1
        Running = 2
    )
    
    // Maximum number of workers.
    const WorkerCount = 1000
    
    func main() {
        // Launch workers.
        var wg sync.WaitGroup
        wg.Add(WorkerCount + 1)
    
        workers := make([]chan int, WorkerCount)
        for i := range workers {
            workers[i] = make(chan int, 1)
    
            go func(i int) {
                worker(i, workers[i])
                wg.Done()
            }(i)
        }
    
        // Launch controller routine.
        go func() {
            controller(workers)
            wg.Done()
        }()
    
        // Wait for all goroutines to finish.
        wg.Wait()
    }
    
    func worker(id int, ws <-chan int) {
        state := Paused // Begin in the paused state.
    
        for {
            select {
            case state = <-ws:
                switch state {
                case Stopped:
                    fmt.Printf("Worker %d: Stopped\n", id)
                    return
                case Running:
                    fmt.Printf("Worker %d: Running\n", id)
                case Paused:
                    fmt.Printf("Worker %d: Paused\n", id)
                }
    
            default:
                // We use runtime.Gosched() to prevent a deadlock in this case.
                // It will not be needed of work is performed here which yields
                // to the scheduler.
                runtime.Gosched()
    
                if state == Paused {
                    break
                }
    
                // Do actual work here.
            }
        }
    }
    
    // controller handles the current state of all workers. They can be
    // instructed to be either running, paused or stopped entirely.
    func controller(workers []chan int) {
        // Start workers
        setState(workers, Running)
    
        // Pause workers.
        setState(workers, Paused)
    
        // Unpause workers.
        setState(workers, Running)
    
        // Shutdown workers.
        setState(workers, Stopped)
    }
    
    // setState changes the state of all given workers.
    func setState(workers []chan int, state int) {
        for _, w := range workers {
            w <- state
        }
    }
    

    【讨论】:

    • `
    • 使用通道进行通信是 Go 语言的惯用方式。如果需要,您可以使用全局变量。但我建议不要使用sync.Mutex 来锁定它。在处理大量 goroutines 时,它们的扩展性不是很好,每个 goroutines 都获取 R/W 锁。在这种情况下,我将使用 sync/atomic 包以原子方式读取/写入状态。
    • @jimt 不错的例子;很好地提醒了频道阻塞时如何运行“默认”。我要补充一点,如果工作人员有时需要一段时间才能回来阅读状态,最好让与他们交谈的通道的缓冲区为 1,这样你就可以匆忙写给他们所有人在 1 号停止之前不暂停或停止 2 号,等等。还是我误解了它是如何工作的?
    • 也许我遗漏了一些东西,但暂停的工作人员不会继续运行(由于选择的默认部分)。我认为这会导致很多 unnec。线程上下文切换。
    • 是的,它将继续进入默认情况,但它并不像您预期​​的那样昂贵。它的作用是将state 设置为适当的值。由您决定是否以及如何限制循环。至于上下文切换,here's 非常详尽地解释了它在 Go 中的工作原理。
    猜你喜欢
    • 2020-06-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-01
    • 2015-04-11
    • 1970-01-01
    相关资源
    最近更新 更多