【发布时间】:2020-10-18 03:14:41
【问题描述】:
我正在从一个长的红移表中提取所有数据块,每个块到一个 csv 文件。我想控制在“同一”时间(同时)创建多少个文件,即如果整个过程将创建 10 个文件,我想,比如说,创建 4 个文件,等到它们被创建,一旦它们“完成”,再创建 4 个,然后再创建剩下的 2 个。
如何使用 channel/s 实现这一点?
我尝试将以下切片更改为通道,但我无法让它像我所说的那样工作,我所做的实现,在创建以下文件之前没有等待/停止前 4 个文件结束那些。
现在我正在使用 WaitGroup 执行以下操作:
package, imports, var, etc...
//Inside func main:
//Create a WaitGroup
var wg = sync.WaitGroup{}
//Opening the connection
db, err := sql.Open("postgres", connStr)
if err != nil {
panic(err.Error())
}
defer db.Close()
//Define chunks using a slice
chunkSizer := Slicer(totalRowsInTable, numberRowsByChunk) // e.g. []int{100, 100, 100... 100}
//Iterating over the array
for index, value := range chunkSizer {
wg.Add(1)
go ExtractorToCSV(db, queriedSection, expFileName)
if (index+1)% 4 == 0 { // <-- 4 is the maximum number of files created at the "same" time
wg.Wait()
}
wg.Wait() // <-- waits for the remaining files (last 2 in this case)
}
//Outside main
func ExtractorToCSV(db *sql.DB, queryToExtract, fileName string) {
//... do its process
wg.Done()
}
我尝试使用我想停止的大小的缓冲通道(在本例中为 4 个),但我没有正确使用它,我不知道...
提前致谢。
【问题讨论】:
-
尝试使用worker pool 模式
-
块是否有序且平衡,即前n个块到一个文件,第二个n块到另一个文件,等等?如果没有,你可以创建
nfilesgoroutines 所有监听一个通道并写入他们自己的文件,你可以通过那个通道发送每条记录。 -
@BurakSerdar 谢谢,这是一个文件的一个块,但是我不知道如何使用您的方法限制/控制在“同一”时间创建多少个文件。
-
@Mike,按照我的描述,每个文件都有一个 goroutine。每个 goroutine 都必须从通道中读取并将其写入文件。当文件达到大小限制时,goroutine 关闭其文件,打开一个新文件,然后继续写入。
标签: go concurrency channel