【问题标题】:How to wait for all goroutines to finish without using time.Sleep?如何在不使用 time.Sleep 的情况下等待所有 goroutines 完成?
【发布时间】:2013-08-14 23:47:25
【问题描述】:

此代码选择同一文件夹中的所有 xml 文件,作为调用的可执行文件,并在回调方法中对每个结果进行异步处理(在下面的示例中,仅打印出文件的名称)。

如何避免使用 sleep 方法来防止 main 方法退出?我在处理频道时遇到了问题(我认为这是同步结果所需要的),因此感谢您的帮助!

package main

import (
    "fmt"
    "io/ioutil"
    "path"
    "path/filepath"
    "os"
    "runtime"
    "time"
)

func eachFile(extension string, callback func(file string)) {
    exeDir := filepath.Dir(os.Args[0])
    files, _ := ioutil.ReadDir(exeDir)
    for _, f := range files {
            fileName := f.Name()
            if extension == path.Ext(fileName) {
                go callback(fileName)
            }
    }
}


func main() {
    maxProcs := runtime.NumCPU()
    runtime.GOMAXPROCS(maxProcs)

    eachFile(".xml", func(fileName string) {
                // Custom logic goes in here
                fmt.Println(fileName)
            })

    // This is what i want to get rid of
    time.Sleep(100 * time.Millisecond)
}

【问题讨论】:

    标签: go synchronization goroutine


    【解决方案1】:

    这是一个使用 WaitGroup 的解决方案。

    首先,定义2个实用方法:

    package util
    
    import (
        "sync"
    )
    
    var allNodesWaitGroup sync.WaitGroup
    
    func GoNode(f func()) {
        allNodesWaitGroup.Add(1)
        go func() {
            defer allNodesWaitGroup.Done()
            f()
        }()
    }
    
    func WaitForAllNodes() {
        allNodesWaitGroup.Wait()
    }
    

    然后,替换callback的调用:

    go callback(fileName)
    

    调用你的实用函数:

    util.GoNode(func() { callback(fileName) })
    

    最后一步,将此行添加到 main 的末尾,而不是 sleep。这将确保主线程在程序停止之前等待所有例程完成。

    func main() {
      // ...
      util.WaitForAllNodes()
    }
    

    【讨论】:

      【解决方案2】:

      尽管sync.waitGroup (wg) 是前进的规范方法,但它确实需要您在wg.Wait 之前至少完成一些wg.Add 调用才能完成。这对于像网络爬虫这样的简单事物可能不可行,因为您事先不知道递归调用的数量,并且需要一段时间来检索驱动 wg.Add 调用的数据。毕竟,在知道第一批子页面的大小之前,你需要加载和解析第一页。

      我使用渠道编写了一个解决方案,避免在我的解决方案中使用 waitGroup Tour of Go - web crawler 练习。每次启动一个或多个 go-routines 时,您都会将号码发送到 children 频道。每次 goroutine 即将完成时,您都会向 done 频道发送 1。当孩子的总和等于完成的总和时,我们就完成了。

      我唯一关心的是results 频道的硬编码大小,但这是(当前)Go 限制。

      
      // recursionController is a data structure with three channels to control our Crawl recursion.
      // Tried to use sync.waitGroup in a previous version, but I was unhappy with the mandatory sleep.
      // The idea is to have three channels, counting the outstanding calls (children), completed calls 
      // (done) and results (results).  Once outstanding calls == completed calls we are done (if you are
      // sufficiently careful to signal any new children before closing your current one, as you may be the last one).
      //
      type recursionController struct {
          results  chan string
          children chan int
          done     chan int
      }
      
      // instead of instantiating one instance, as we did above, use a more idiomatic Go solution
      func NewRecursionController() recursionController {
          // we buffer results to 1000, so we cannot crawl more pages than that.  
          return recursionController{make(chan string, 1000), make(chan int), make(chan int)}
      }
      
      // recursionController.Add: convenience function to add children to controller (similar to waitGroup)
      func (rc recursionController) Add(children int) {
          rc.children <- children
      }
      
      // recursionController.Done: convenience function to remove a child from controller (similar to waitGroup)
      func (rc recursionController) Done() {
          rc.done <- 1
      }
      
      // recursionController.Wait will wait until all children are done
      func (rc recursionController) Wait() {
          fmt.Println("Controller waiting...")
          var children, done int
          for {
              select {
              case childrenDelta := <-rc.children:
                  children += childrenDelta
                  // fmt.Printf("children found %v total %v\n", childrenDelta, children)
              case <-rc.done:
                  done += 1
                  // fmt.Println("done found", done)
              default:
                  if done > 0 && children == done {
                      fmt.Printf("Controller exiting, done = %v, children =  %v\n", done, children)
                      close(rc.results)
                      return
                  }
              }
          }
      }
      

      Full source code for the solution

      【讨论】:

        【解决方案3】:

        sync.WaitGroup 可以在这里为您提供帮助。

        package main
        
        import (
            "fmt"
            "sync"
            "time"
        )
        
        
        func wait(seconds int, wg * sync.WaitGroup) {
            defer wg.Done()
        
            time.Sleep(time.Duration(seconds) * time.Second)
            fmt.Println("Slept ", seconds, " seconds ..")
        }
        
        
        func main() {
            var wg sync.WaitGroup
        
            for i := 0; i <= 5; i++ {
                wg.Add(1)   
                go wait(i, &wg)
            }
            wg.Wait()
        }
        

        【讨论】:

          【解决方案4】:

          WaitGroups 绝对是执行此操作的规范方法。不过,为了完整起见,这里是在引入 WaitGroups 之前常用的解决方案。基本思想是使用通道说“我完成了”,并让主 goroutine 等待,直到每个生成的例程都报告其完成。

          func main() {
              c := make(chan struct{}) // We don't need any data to be passed, so use an empty struct
              for i := 0; i < 100; i++ {
                  go func() {
                      doSomething()
                      c <- struct{}{} // signal that the routine has completed
                  }()
              }
          
              // Since we spawned 100 routines, receive 100 messages.
              for i := 0; i < 100; i++ {
                  <- c
              }
          }
          

          【讨论】:

          • 很高兴看到使用普通频道的解决方案。额外的好处:如果doSomething() 返回一些结果,那么您可以将其放在通道上,并且您可以在第二个 for 循环中收集和处理结果(一旦它们准备好)
          • 只有在您已经知道要开始使用的 gorutines 数量的情况下才有效。如果您正在编写某种 html 爬虫并以递归方式为页面上的每个链接启动 gorutines 怎么办?
          • 无论如何,您都需要以某种方式跟踪这一点。使用 WaitGroups 会更容易一些,因为每次生成新的 goroutine 时,您可以先执行 wg.Add(1) ,因此它会跟踪它们。使用频道会有点困难。
          • c 将阻塞,因为所有 go 例程都会尝试访问它,并且它是无缓冲的
          • 如果用“block”表示程序会死锁,那不是真的。 You can try running it yourself. 原因是写入c 的唯一goroutine 与从c 读取的主goroutine 不同。因此,主 goroutine 始终可以从通道中读取值,当其中一个 goroutine 可以将值写入通道时,就会发生这种情况。你是对的,如果这段代码没有生成 goroutine,而是在一个 goroutine 中运行所有东西,它就会死锁。
          【解决方案5】:

          您可以使用sync.WaitGroup。引用链接的例子:

          package main
          
          import (
                  "net/http"
                  "sync"
          )
          
          func main() {
                  var wg sync.WaitGroup
                  var urls = []string{
                          "http://www.golang.org/",
                          "http://www.google.com/",
                          "http://www.somestupidname.com/",
                  }
                  for _, url := range urls {
                          // Increment the WaitGroup counter.
                          wg.Add(1)
                          // Launch a goroutine to fetch the URL.
                          go func(url string) {
                                  // Decrement the counter when the goroutine completes.
                                  defer wg.Done()
                                  // Fetch the URL.
                                  http.Get(url)
                          }(url)
                  }
                  // Wait for all HTTP fetches to complete.
                  wg.Wait()
          }
          

          【讨论】:

          • 有什么理由必须在 go 例程之外执行 wg.Add(1) 吗?我们可以在延迟 wg.Done() 之前在里面做吗?
          • 坐,是的,这是有原因的,它在sync.WaitGroup.Add docs中有描述:Note that calls with positive delta must happen before the call to Wait, or else Wait may wait for too small a group. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. See the WaitGroup example.
          • 修改此代码导致我进行了长时间的调试,因为我的 goroutine 是一个命名函数,并且将 WaitGroup 作为值传递将复制它并使 wg.Done() 无效。虽然这可以通过传递指针 &wg 来解决,但防止此类错误的更好方法是首先将 WaitGroup 变量声明为指针:wg := new(sync.WaitGroup) 而不是 var wg sync.WaitGroup
          • 我想在for _, url := range urls 的上方写wg.Add(len(urls)) 是有效的,我认为最好只使用一次添加。
          • @RobertJackWill:好消息!顺便说一句,the docs 对此进行了介绍:“第一次使用后不得复制 WaitGroup。Go 没有强制执行此操作的方法。但实际上,go vet 确实检测到这种情况并警告“func按值传递锁定:sync.WaitGroup包含sync.noCopy”。
          猜你喜欢
          • 2011-04-25
          • 1970-01-01
          • 1970-01-01
          • 2010-11-04
          • 2015-09-18
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多