【问题标题】:Recursive concurrency with golanggolang 递归并发
【发布时间】:2018-05-21 11:00:32
【问题描述】:

我想在一些 goroutine 之间分配一些负载。如果事先知道任务的数量,那么组织起来很容易。例如,我可以与等待组一起扇出。

nTasks := 100
nGoroutines := 10

// it is important that this channel is not buffered
ch := make(chan *Task)
done := make(chan bool)
var w sync.WaitGroup
// Feed the channel until done
go func () {
    for i:= 0; i < nTasks; i++ {
        task := getTaskI(i)
        ch <- task
    }
    // as ch is not buffered once everything is read we know we have delivered all of them
    for i:=0; i < nGoroutines; i++ {
        done <- false
    }
}()
for i:= 0; i < nGoroutines; i ++ {
    w.Add(1)
    go func () {
        defer w.Done()
        select {
        case task := <-ch:
            doSomethingWithTask(task)
        case <- done:
            return
        }
    }()
}
w.Wait()
// All tasks done, all goroutines closed

但是,在我的情况下,每个任务都会返回更多要完成的任务。比如说一个爬虫,我们从爬取的网络中接收所有链接。我最初的预感是有一个主循环,我可以在其中跟踪已完成的任务数和待处理的任务数。完成后,我向所有 goroutine 发送完成信号:

nGoroutines := 10
ch := make(chan *Task, nGoroutines)
feedBackChannel := make(chan * Task, nGoroutines)
done := make(chan bool)

for i:= 0; i < nGoroutines; i ++ {
    go func () {
        select {
        case task := <-ch:
            task.NextTasks = doSomethingWithTask(task)
            feedBackChannel <- task
        case <- done:
            return
        }
    }()
}

// seed first task
ch <- firstTask
nTasksRemaining := 1

for nTasksRemaining > 0 {
    task := <- feedBackChannel
    nTasksRemaining -= 1
    for _, t := range(task.NextTasks) {
        ch <- t
        nTasksRemaining++
    }
}
for i:=0; i < nGoroutines; i++ {
    done <- false
}

但是,这会产生死锁。例如,如果 NextTasks 大于 goroutine 的数量,那么当第一个任务完成时主循环将停止。但是第一个任务无法完成,因为 mainLoop 正在等待写入,反馈被阻塞。

解决这个问题的一个“简单”方法是异步发布到频道: 而不是做feedBackChannel &lt;- taskgo func () {feedBackChannel &lt;- task}()。现在,这感觉就像一个可怕的黑客攻击。特别是因为可能有数十万个任务。

什么是避免这种僵局的好方法?我已经搜索了并发模式,但大多是更简单的东西,例如扇出或后期阶段不影响早期步骤的管道。

【问题讨论】:

  • 您的描述有点过于复杂,无法完全理解,但我有 2 条注释。 1. 你在 goroutine 中错误地执行了 waitGroup.Add(),它应该在调用它之前完成。一旦 goroutine 启动,我通常会立即调用 defer waitGroup.Done()。 2. 不清楚为什么需要feedbackChannel。在我看来,您只需根据需要生成新的 grorutines,并在主线程中执行 watGroup.Wait()。但我可能缺少一些要求。
  • @AlexanderTrakhimenok 生成新的 goroutine 会起作用,但我想重用 goroutine 并限制它们的数量会消耗更少的资源,对吧?就我而言,我预计会有数十万个任务。 (顺便说一句,我已经修复了等待组)
  • Go 例程非常轻量,通常会生成数百万个 goroutine 而没有任何问题。虽然您可以查看速率限制模式gobyexample.com/rate-limiting
  • Goroutines 非常轻量级。我至少会在花哨之前测试您的程序需要多少资源。通常数十万个同时运行的 goroutine 都不是问题。
  • @GabrielFurstenheim goroutines 的开销非常低,我发现通过生成 goroutines 而不是尝试自己管理工作来管理类似的工作队列相对容易。另一方面,它大约是数千,而不是数十万,因此它可能不适用于您的情况。但是试一试,它可以工作,这会让你的代码变得简单得多。

标签: go concurrency


【解决方案1】:

如果我正确理解了您的问题,那么您的解决方案非常复杂。这里有几点。希望对您有所帮助。

  • 正如人们在 cmets 中提到的,启动一个 goroutine 很便宜(内存和它们之间的切换都比操作系统级别的读取便宜得多),你可以拥有十万个。让我们假设您出于某些原因想要使用工作 goroutine。
  • 您可以关闭ch 频道,而不是完成频道,而不是select,您只需range 在您的频道上获取任务。
  • 我看不出将chfeedBackChannel 分开的意义,只需将您拥有的每项任务推入ch 并增加其容量即可。
  • 如前所述,当您尝试将新任务排入队列时,您可能会遇到死锁。我的解决方案非常幼稚。只要增加它的容量,直到你确定它不会溢出(如果cap(ch) - len(ch) &lt; threshold,你也可以记录警告)。如果您创建一个容量为 100 万的通道(指针),则大约需要 8 * 1e6 ~= 8MB 的 ram。

【讨论】:

  • 我想到了 feedBackChannel 的一种方法来跟踪剩余的任务数量。我不能只使用默认值进行选择,因为这意味着如果我在某些时候缺少任务,那么 goroutines 将开始关闭。这就是为什么我试图将其集中在一项任务中。大量缓冲通道实际上是非常可行的。正如您所说,通道的内存占用量非常小。在我的特殊情况下,我想做的任务是不是 IO,而是 CPU(它正在并行化一些索引),所以一直打开 goroutine 可能会产生影响。
  • @GabrielFurstenheim 不知道它是否适合您的问题,如果您合并ch 中的所有频道,您可以使用len(ch) 获得任务计数。如果以后有时间,我会尝试添加代码示例。
猜你喜欢
  • 2018-12-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-01-24
  • 1970-01-01
  • 2022-11-10
  • 1970-01-01
相关资源
最近更新 更多