【问题标题】:Worker pool for a potentially recursive task (i.e., each job can queue other jobs)潜在递归任务的工作池(即,每个作业都可以排队其他作业)
【发布时间】:2015-04-13 22:37:54
【问题描述】:

我正在编写一个应用程序,用户可以从多个“工作”(实际上是 URL)开始。在开始时(主程序),我将这些 URL 添加到队列中,然后启动 x 个处理这些 URL 的 goroutine。

在特殊情况下,URL 指向的资源可能包含更多必须添加到队列中的 URL。 3 名工人正在等待新的工作进入并处理它们。问题是:一旦每个工人都在等待工作(并且没有人在生产任何工作),工人应该完全停止。所以要么全部工作,要么没有人工作。

我当前的实现看起来像这样,我不认为它很优雅。不幸的是,我想不出一个不包含竞争条件的更好方法,而且我不完全确定这个实现是否真的按预期工作:

var queue // from somewhere
const WORKER_COUNT = 3
var done chan struct{}

func work(working chan int) {
  absent := make(chan struct{}, 1)
  // if x>1 jobs in sequence are popped, send to "absent" channel only 1 struct.
  // This implementation also assumes that the select statement will be evaluated "in-order" (channel 2 only if channel 1 yields nothing) - is this actually correct? EDIT: It is, according to the specs.
  one := false
  for {
    select {
    case u, ok := <-queue.Pop():
      if !ok {
        close(absent)
        return
      }
      if !one {
        // I have started working (delta + 1)
        working <- 1
        absent <- struct{}{}
        one = true
      }
      // do work with u (which may lead to queue.Push(urls...))
    case <-absent: // no jobs at the moment. consume absent => wait
      one = false
      working <- -1
    }
  }
}

func Start() {
  working := make(chan int)
  for i := 0; i < WORKER_COUNT; i++ {
    go work(working)
  }
  // the amount of actually working workers...
  sum := 0
  for {
    delta := <-working
    sum += delta
    if sum == 0 {
      queue.Close() // close channel -> kill workers.
      done <- struct{}{}
      return
    }
  }
}

有没有更好的方法来解决这个问题?

【问题讨论】:

  • 简短版:您已经明白了,但sync.WaitGroup 是处理您使用working 频道进行的簿记的更简单方法。
  • 好吧,我实际上认为在某处读到,在主线程等待后添加到等待组是一个坏主意。阅读文档后,我想我看错了......
  • 罗杰。在答案中发布了sync.WaitGroup 示例(以及我必须这样做的代码的链接),供任何可能偶然发现此问题的人使用。
  • 要求更具体:工人总数不得超过WORKER_COUNT(选项是让WORKER_COUNT - 1工人工作+主例程。另外,假设每个工作都消失了(不剩下的工作),并且当前正在处理的最后一个工作会产生另外 100 个工作,您的解决方案会为此工作吗?意思是:WORKER_COUNT 工人会处理即将到来的 100 个工作,还是会减少到 1 个工作为那份工作?
  • 是的,新版本会处理所有这些,请查看下面的内容——它恰好启动了四个工作人员,当一个工作人员无法将子任务卸载到另一个工作人员时,它会立即执行(即同步)。

标签: concurrency go synchronization worker


【解决方案1】:

您可以use a sync.WaitGroup(请参阅docs)控制工作人员的生命周期,并使用非阻塞发送,以便工作人员在尝试排队更多作业时不会死锁:

package main

import "sync"

const workers = 4

type job struct{}

func (j *job) do(enqueue func(job)) {
    // do the job, calling enqueue() for subtasks as needed
}

func main() {
    jobs, wg := make(chan job), new(sync.WaitGroup)
    var enqueue func(job)

    // workers
    for i := 0; i < workers; i++ {
        go func() {
            for j := range jobs {
                j.do(enqueue)
                wg.Done()
            }
        }()
    }

    // how to queue a job
    enqueue = func(j job) {
        wg.Add(1)
        select {
        case jobs <- j: // another worker took it
        default: // no free worker; do the job now
            j.do(enqueue)
            wg.Done()
        }
    }

    todo := make([]job, 1000)
    for _, j := range todo {
        enqueue(j)
    }
    wg.Wait()
    close(jobs)
}

使用缓冲通道避免死锁的困难在于,您必须预先分配一个足够大的通道,以确保在不阻塞的情况下容纳所有待处理的任务。除非您有少量已知的 URL 需要抓取,否则会出现问题。

当您退回到在当前线程中执行普通递归时,您没有静态缓冲区大小限制。当然,仍然存在限制:如果有太多的工作待处理,您可能会用完 RAM,理论上您可以通过深度递归耗尽堆栈(但这很难!)。因此,如果您正在抓取整个网络,则需要以更复杂的方式跟踪待处理的任务。

最后,作为一个更完整的例子,我对这段代码并不感到非常自豪,但我碰巧写了一个function to kick off a parallel sort,它是递归的,就像你的 URL 获取一样。

【讨论】:

  • 谢谢!这正是我一直在寻找的。在考虑了您的解决方案后,我将其包含在我的解决方案中。实际上,我有点“幸运”,因为我使用的队列是我自己编写的包装器,因此我可以将 WaitGroup 集成到所述队列中。任何从队列中弹出内容的“工作人员”都负责调用“Done()”,并且每当某项被添加到队列中时,队列都会将 wg 增加 1。这个解决方案要好得多。这是我的代码现在的样子。非常感谢! gist.github.com/muja/073ac7e3eb9de5c00611
猜你喜欢
  • 1970-01-01
  • 2012-08-08
  • 1970-01-01
  • 2013-04-02
  • 2021-09-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多