【问题标题】:Process output of arbitrary number of goroutines as they finish在完成时处理任意数量的 goroutines 的输出
【发布时间】:2021-11-07 20:46:44
【问题描述】:

WaitGroups 用于等待所有 goroutine 在继续执行之前完成,但是当它们完成时如何处理它们的输出?

这个方法没问题

c := make(chan string)
rc := 0
for _, url := range urls {
    rc++
    go func(url string) {
        data := get(url)
        c <- data
    }(url)
}
for i := 0; i < rc; i++ {
    data <- c
}

但是当你必须从 goroutines 调用 goroutines 时它会停止工作

for _, url := range urls {
    go func(url string) {
        data := get(url)
        urls := get_urls(data)
        for _, url := range urls {
            go func(url){
                data := get(url)
                c <- data
            }(url)
        }
    }(url)
}

这一次我们必须将rc++ 放在 goroutine 中,这将导致未定义的行为。做什么?

【问题讨论】:

  • 不要计算。在频道范围内,让你的经理 goroutine 在工作人员完成后关闭频道。收到所有结果后,结果通道范围将自然退出。
  • @colm.anseo manager goroutine 怎么知道?
  • 通过sync.WaitGroup

标签: go concurrency synchronization web-crawler goroutine


【解决方案1】:

有很多方法可以解决这个问题,但最惯用的(因此可能是“最佳”)是 colm.anseo suggested in a comment:让“处理输出”例程使用范围 for 循环,并让生成例程使用 @ 987654322@ 带有关闭 goroutine 的计数器,如下所示:

// result channel:
c := make(chan string)

// wait-group:
var wg sync.WaitGroup

// run over the URLs
for _, url := range urls {
    wg.Add(1) // count up another start()
    go start(&wg, url, c)
}
go func() {
    // wait for all start()s to say they are done
    wg.Wait()
    // and now close c
    close(c)
}()
for data := range c {
    // deal with data
}

现在我们可以看到每个start 函数是如何工作的:

// start() loads up a set of URLs from a URL,
// then spins off worker goroutines that read from
// each of those URLs, sending data to channel c.
// When all of its subsidiary goroutines have finished,
// start() signals that it is done.
func start(wg *sync.Waitgroup, url string, c chan string) {
    defer wg.Done()
    data := get(url)
    urls := get_urls(data)
    var subWG sync.WaitGroup
    for _, url := range urls {
        subWG.Add(1)
        go func(url) {
            defer subWG.Done()
            data := get(url)
            c <- data
        }(url)
    }
    subWG.Wait()
}

函数start 当然可以内联,就像你最初做的那样;为了清楚起见,我这样写。

(一般来说,你真的想要一个“有限数量的工作人员”模式,即 工作人员池,而不是在每个 URL 中分离出一些神秘数量的 goroutines。有很多例子这个在网络上。)

【讨论】:

  • 是的,限制worker的数量确实是必要的,以避免过多的连接和恐慌,但是直到@987654327的数据才能知道它们的确切数量收到@。
  • 是的,但是使用工作池系统​​,您可以收集工作并将其分配到工作池中,并通过 来自工作池的自动背压来保持稳定(ish ) 管道流动。
【解决方案2】:

当您有不确定数量的结果时 - 最好让工作人员和工作人员管理器 goroutine 管理结果通道并在所有结果完成后关闭它。这避免了混乱的计数/互斥逻辑。

所以要转换你的第一个例子:

c := make(chan string)

var wg sync.WaitGroup // used by manager goroutine to determine finish

for _, url := range urls {
    wg.Add(1) // about to start worker
    go func(url string) {
        defer wg.Done() // worker is complete
        data := get(url)
        c <- data
    }(url)
}

// manager goroutine
go func() {
    wg.Wait() // all workers are done ...
    close(c) // ... so signal this via channel close
}()

// results collection is then very simple
for data := range c {
    fmt.Println(data)
}

这种设计同样适用于工作人员创建更多 goroutine 的情况。但正如@torek 所说,需要额外的sync.WaitGroup

c := make(chan string)

var wg sync.WaitGroup

for _, url := range urls {
    wg.Add(1) // new worker
    go func(url string) {
        defer wg.Done() // worker done
        data := get(url)
        urls := get_urls(data)
        var wg2 sync.WaitGroup
        for _, url := range urls {
            wg2.Add(1)
            go func(url) {
                defer wg2.Done()
                data := get(url)
                c <- data
            }(url)
        }
        wg2.Wait()
    }(url)
}

// manager goroutine
go func() {
    wg.Wait() // all workers are done ...
    close(c)  // ... so signal this via channel close
}()

for data := range c {
    fmt.Println(data)
}

【讨论】:

  • 你和我第一次做的一样,忘了有一个子等待组。当然,我们也可以在子 go-routines 中进行额外的 wg.Add(1)defer wg.Done() 调用,使用单个整体等待组计数器。
  • 为什么必须将工人添加到新的等待组中?
  • 因为我们也需要跟踪它们的生命周期。对于一个被认为是完整的工人,我们需要知道它的子工人是完整的。那么一旦所有的工人都完成了,我们就可以确定地关闭结果通道。 (过早关闭通道,会导致在关闭通道上写入时出现恐慌。)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-11-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-02-07
相关资源
最近更新 更多