【问题标题】:Limiting the number of concurrent tasks running限制运行的并发任务数
【发布时间】:2020-02-22 19:44:28
【问题描述】:

所以我经常用go 来解决这个问题。假设我有一个包含 100,000 行文本的文本文件。现在我想将所有这些行保存到数据库中。所以我会做这样的事情:

file, _ := iotuil.ReadFile("file.txt")

fileLines := strings.Split(string(file), "\n")

现在我将遍历文件中的所有行:

for _, l := range fileLines{
  saveToDB(l)
}

现在我想同时运行这个saveToDB func:

var wg sync.WaitGroup

for _, l := range fileLines{
  wg.Add(1)
  go saveToDB(l, &wg)
}

wg.Wait()

我不知道这是否是一个问题,但它会运行 100,000 个并发函数。有没有办法说嘿运行 100 个并发函数等待所有这些函数完成然后再运行 100 个。

for i, _ := range fileLine {
  for t = 0; t < 100; t++{
    wg.Add(1)
    go saveToDB(fileLine[i], &wg)
  }
  wg.Wait()
}

我需要做类似的事情还是有更清洁的方法来解决这个问题?还是我运行 100,000 个并发任务不是问题?

【问题讨论】:

标签: go concurrency


【解决方案1】:

我认为最好的方法是保留一个工作 goroutine 池,在通道中为它们分派工作,然后关闭通道以便它们退出。

类似这样的:

// create a channel for work "tasks"
ch := make(chan string)

wg := sync.WaitGroup{}

// start the workers
for t := 0; t < 100; t++{
    wg.Add(1)
    go saveToDB(ch, &wg)
}

// push the lines to the queue channel for processing
for _, line := range fileline {
    ch <- line
}

// this will cause the workers to stop and exit their receive loop
close(ch)

// make sure they all exit
wg.Wait()

然后 saveFunction 看起来像这样:

func saveToDB(ch chan string, wg *sync.WaitGroup) {
    // cnosume a line
    for line := range ch {
        // do work
        actuallySaveToDB(line)
    }
    // we've exited the loop when the dispatcher closed the channel, 
    // so now we can just signal the workGroup we're done
    wg.Done()
}

【讨论】:

  • 是的,我很好地利用了 Go 提供的结构。
  • 我对上述内容的一个补充是使其成为缓冲通道,因此您不会向通道写入无限的新行。 golang.org/doc/effective_go.html#channels
  • @Sean 如果没有缓冲,您将不会向通道写入无限的新行 - 您根本不会在其上发送行,除非所有工作人员都已经消耗了所有内容。缓冲通道将允许您写入多于已消耗的 N 行,这将导致在工作人员完成之前完成调度,它不会加快或减慢任何速度
猜你喜欢
  • 1970-01-01
  • 2022-10-15
  • 2011-02-23
  • 2021-07-09
  • 2014-02-20
  • 1970-01-01
  • 2019-05-14
  • 2021-06-16
  • 1970-01-01
相关资源
最近更新 更多