【发布时间】:2019-12-08 21:05:42
【问题描述】:
我有这个基于pipelines 示例的代码。 walkFiles 获取一个或多个文件夹(在folders 变量中指定)并“访问”作为参数给出的所有文件夹中的文件。它还需要一个done 频道来允许取消,但我认为这对这个问题并不重要。
当只传递一个要遍历的文件夹时,代码按预期工作。但是当给定两个时,它给了我臭名昭著的fatal error: all goroutines are asleep - deadlock! 错误。它甚至看起来通过处理两个文件夹的文件正在做正确的事情,但它并没有很好地结束。我在这个函数的并发中犯了什么(可能很明显)错误?
代码如下:
type result struct {
path string
checksum []byte
err error
}
type FileData struct {
Hash []byte
}
// walkFiles starts a goroutine to walk the directory tree at root and send the
// path of each regular file on the string channel. It sends the result of the
// walk on the error channel. If done is closed, walkFiles abandons its work.
func (p Processor) walkFiles(done <-chan struct{}, folders []string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
visit := func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
}
var wg sync.WaitGroup
for i, folder := range folders {
wg.Add(1)
go func(f string, i int) {
defer wg.Done()
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(f, visit)
}(folder, i)
}
go func() {
wg.Wait()
close(paths)
}()
return paths, errc
}
func closeFile(f *os.File) {
err := f.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}
// processor reads path names from paths and sends digests of the corresponding
// files on c until either paths or done is closed.
func (p Processor) process(done <-chan struct{}, files <-chan string, c chan<- result, loc *locator.Locator) {
for f := range files {
func() {
file, err := os.Open(f.path)
if err != nil {
fmt.Println(err)
return
}
defer closeFile(file)
// Hashing file, producing `checksum` variable, and an `err`
select {
case c <- result{f.path, checksum, err}:
case <-done:
return
}
}()
}
}
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error. In that case,
// MD5All does not wait for inflight read operations to complete.
func (p Processor) MD5All(folders []string) (map[string]FileData, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
paths, errc := p.walkFiles(done, folders)
c := make(chan result)
var wg sync.WaitGroup
wg.Add(NUM_DIGESTERS)
for i := 0; i < NUM_DIGESTERS; i++ {
go func() {
p.process(done, paths, c, loc)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
// End of pipeline. OMIT
m := make(map[string]FileData)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = FileData{r.checksum}
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
func (p Processor) Start() map[string]FileData {
m, err := p.MD5All(p.folders)
if err != nil {
log.Fatal(err)
}
return m
}
【问题讨论】:
-
显示在
paths和errc上接收的代码。使用紧急处理程序打印的堆栈跟踪指示 groutine 在代码中的哪个位置被阻塞。 -
没有人在读取 errc。
-
您只从 errc 读取一次,但所有 groutine 都在写入它。一旦读取了第一个完成的 goroutine 的 errc,所有其他的都卡在等待写入它。
-
@BurakSerdar 就是这样,谢谢 :) 如果你想把它作为答案,我会接受它!
标签: go concurrency goroutine