【问题标题】:GO code with execution control using channels使用通道进行执行控制的 GO 代码
【发布时间】:2020-10-18 03:14:41
【问题描述】:

我正在从一个长的红移表中提取所有数据块,每个块到一个 csv 文件。我想控制在“同一”时间(同时)创建多少个文件,即如果整个过程将创建 10 个文件,我想,比如说,创建 4 个文件,等到它们被创建,一旦它们“完成”,再创建 4 个,然后再创建剩下的 2 个。

如何使用 channel/s 实现这一点?

我尝试将以下切片更改为通道,但我无法让它像我所说的那样工作,我所做的实现,在创建以下文件之前没有等待/停止前 4 个文件结束那些。

现在我正在使用 WaitGroup 执行以下操作:

package, imports, var, etc...

//Inside func main:

//Create a WaitGroup
var wg = sync.WaitGroup{}

//Opening the connection
db, err := sql.Open("postgres", connStr)
if err != nil {
    panic(err.Error())
}
defer db.Close()

//Define chunks using a slice
chunkSizer := Slicer(totalRowsInTable, numberRowsByChunk) // e.g. []int{100, 100, 100...  100}

//Iterating over the array
for index, value := range chunkSizer {
    wg.Add(1)
    go ExtractorToCSV(db, queriedSection, expFileName)

    if (index+1)% 4 == 0 {   // <-- 4 is the maximum number of files created at the "same" time
            wg.Wait()
        }

    wg.Wait() // <-- waits for the remaining files (last 2 in this case)

}

//Outside main
func ExtractorToCSV(db *sql.DB, queryToExtract, fileName string) {
    //... do its process
    wg.Done()
}

我尝试使用我想停止的大小的缓冲通道(在本例中为 4 个),但我没有正确使用它,我不知道...

提前致谢。

【问题讨论】:

  • 尝试使用worker pool 模式
  • 块是否有序且平衡,即前n个块到一个文件,第二个n块到另一个文件,等等?如果没有,你可以创建nfiles goroutines 所有监听一个通道并写入他们自己的文件,你可以通过那个通道发送每条记录。
  • @BurakSerdar 谢谢,这是一个文件的一个块,但是我不知道如何使用您的方法限制/控制在“同一”时间创建多少个文件。
  • @Mike,按照我的描述,每个文件都有一个 goroutine。每个 goroutine 都必须从通道中读取并将其写入文件。当文件达到大小限制时,goroutine 关闭其文件,打开一个新文件,然后继续写入。

标签: go concurrency channel


【解决方案1】:

更新 - 停止条件

您可以像这样使用通道来保存下一行代码。这是我为您编写的最少代码。随心所欲地调整它

var doneCh = make(chan bool)

func main() {
    WRITE_POOL := 4

    for index, val := range RANGE {
        go extractToFile(val)

        if (index + 1) % WRITE_POOL == 0 {
            // wait for doneCh to finish 
            // if the iteration is divisive of WRITE_POOL
            <-doneCh
            <-doneCh
            <-doneCh
            <-doneCh
        } else if index == MAX - 1 {
            // wait for whatever doneCh left to finish 
            // if current val is the last one
            LEFT := MAX - index - 1
            for i := 0; i < LEFT; i++ {
                <-doneCh
            }
        }
    }
}

func extractToFile(val int) {
    os.Create(fmt.Sprintf("test-%d", val))
    doneCh<-true
}

为了获得更好的性能,请尝试:

  1. 创建数据通道到主函数可以将数据发送给它,ExtractorToCSV可以接收它。
  2. 创建ExtractorToCSV作为goroutine并从数据通道读取,ExtractorToCSV完成后,发送数据到doneCh
  3. 发送db数据到数据通道,在ExtractorToCSV完成写入文件后,发送true到doneCh。

如果您需要更多示例,我会更新这篇文章。

【讨论】:

  • 感谢 Fahim,但您当前的答案缺少一些关键细节(或者我对 golang 缺乏了解)。例如,如果我将 WRITE_POOL 值更改为 3,它只会在 for 循环中调用 extracToFile 直到 val = 17(手动将 &lt;-doneCh 更改为 3,否则会死锁)。它也应该为其余的调用一个 go 例程,但它没有。
  • 我已经更新了停止条件,所以如果剩下的不是WRITE_POOL的除数,那么就等待剩余的doneCh
猜你喜欢
  • 2016-08-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-02-17
  • 2013-05-28
  • 1970-01-01
相关资源
最近更新 更多