【问题标题】:Panic: Send on a closed channel when running go routine in foor loop恐慌:在 for 循环中运行 goroutine 时在关闭的通道上发送
【发布时间】:2022-11-11 23:00:58
【问题描述】:

我正在尝试制作 grep 的并发版本。该程序遍历目录/子目录并将任何匹配的字符串返回给提供的模式。

一旦我有所有要搜索的文件(请参阅searchPaths 函数),我正在尝试同时运行文件搜索。最初我得到:

fatal error: all goroutines are asleep - deadlock

直到我在 searchPaths 的末尾添加了close(out),它现在返回:

Panic: Send on a closed channel when running go routine in foor loop

我正在尝试实现类似于:

https://go.dev/blog/pipelines#fan-out-fan-in

是不是我在错误的地方关闭了频道?

package main

import (
    "fmt"
    "io/fs"
    "io/ioutil"
    "log"
    "os"
    "path/filepath"
    "strings"
    "sync"
)

type SearchResult struct {
    line       string
    lineNumber int
}

type Display struct {
    filePath string
    SearchResult
}

var wg sync.WaitGroup

func (d Display) PrettyPrint() {
    fmt.Printf("Line Number: %v\nFilePath: %v\nLine: %v\n\n", d.lineNumber, d.filePath, d.line)
}

func searchLine(pattern string, line string, lineNumber int) (SearchResult, bool) {
    if strings.Contains(line, pattern) {
        return SearchResult{lineNumber: lineNumber + 1, line: line}, true
    }
    return SearchResult{}, false
}

func splitIntoLines(file string) []string {
    lines := strings.Split(file, "\n")
    return lines
}

func fileFromPath(path string) string {
    fileContent, err := ioutil.ReadFile(path)

    if err != nil {
        log.Fatal(err)
    }

    return string(fileContent)
}

func getRecursiveFilePaths(inputDir string) []string {
    var paths []string
    err := filepath.Walk(inputDir, func(path string, info fs.FileInfo, err error) error {
        if err != nil {
            fmt.Printf("prevent panic by handling failure accessing a path %q: %v\n", path, err)
            return err
        }
        if !info.IsDir() {
            paths = append(paths, path)
        }
        return nil
    })
    if err != nil {
        fmt.Printf("Error walking the path %q: %v\n", inputDir, err)
    }
    return paths
}

func searchPaths(paths []string, pattern string) <-chan Display {
    out := make(chan Display)

    for _, path := range paths {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for _, display := range searchFile(path, pattern) {
                out <- display
            }
        }()
    }
    close(out)
    return out
}

func searchFile(path string, pattern string) []Display {
    var out []Display
    input := fileFromPath(path)
    lines := splitIntoLines(input)
    for index, line := range lines {
        if searchResult, ok := searchLine(pattern, line, index); ok {
            out = append(out, Display{path, searchResult})
        }
    }
    return out
}

func main() {
    pattern := os.Args[1]
    dirPath := os.Args[2]

    paths := getRecursiveFilePaths(dirPath)

    out := searchPaths(paths, pattern)
    wg.Wait()
    for d := range out {
        d.PrettyPrint()
    }

}

【问题讨论】:

  • 发送者实体应该关闭通道,以避免在关闭的通道上发送(这会导致运行时恐慌)。如果有多个发送者,则必须协调它们,并且只有在所有发送者都完成后才关闭通道。您的 wg.Wait() 是“错位”。见:Closing channel of unknown length
  • close(out); return out 是一个直接的危险信号:返回刚刚关闭因此无法使用的通道是没有意义的。

标签: go concurrency goroutine


【解决方案1】:

这段代码的两个主要问题是

  1. 您需要在wg.Wait() 完成后关闭频道。您可以在单独的 goroutine 中执行此操作,如下所示
  2. 由于 searchPaths 函数中的 path var 作为 for 循环逻辑的一部分被多次重新分配,它是 not a good practice to use that var directly in the goroutines,更好的方法是将它作为参数传递。
    package main
    
    import (
        "fmt"
        "io/fs"
        "io/ioutil"
        "log"
        "os"
        "path/filepath"
        "strings"
        "sync"
    )
    
    type SearchResult struct {
        line       string
        lineNumber int
    }
    
    type Display struct {
        filePath string
        SearchResult
    }
    
    var wg sync.WaitGroup
    
    func (d Display) PrettyPrint() {
        fmt.Printf("Line Number: %v
    FilePath: %v
    Line: %v
    
    ", d.lineNumber, d.filePath, d.line)
    }
    
    func searchLine(pattern string, line string, lineNumber int) (SearchResult, bool) {
        if strings.Contains(line, pattern) {
            return SearchResult{lineNumber: lineNumber + 1, line: line}, true
        }
        return SearchResult{}, false
    }
    
    func splitIntoLines(file string) []string {
        lines := strings.Split(file, "
    ")
        return lines
    }
    
    func fileFromPath(path string) string {
        fileContent, err := ioutil.ReadFile(path)
    
        if err != nil {
            log.Fatal(err)
        }
    
        return string(fileContent)
    }
    
    func getRecursiveFilePaths(inputDir string) []string {
        var paths []string
        err := filepath.Walk(inputDir, func(path string, info fs.FileInfo, err error) error {
            if err != nil {
                fmt.Printf("prevent panic by handling failure accessing a path %q: %v
    ", path, err)
                return err
            }
            if !info.IsDir() {
                paths = append(paths, path)
            }
            return nil
        })
        if err != nil {
            fmt.Printf("Error walking the path %q: %v
    ", inputDir, err)
        }
        return paths
    }
    
    func searchPaths(paths []string, pattern string) chan Display {
        out := make(chan Display)
        for _, path := range paths {
            wg.Add(1)
            go func(p string, w *sync.WaitGroup) { // as path var is changing value in the loop, it's better to provide it as a argument in goroutine
                defer w.Done()
                for _, display := range searchFile(p, pattern) {
                    out <- display
                }
            }(path, &wg)
        }
        return out
    }
    
    func searchFile(path string, pattern string) []Display {
        var out []Display
        input := fileFromPath(path)
        lines := splitIntoLines(input)
        for index, line := range lines {
            if searchResult, ok := searchLine(pattern, line, index); ok {
                out = append(out, Display{path, searchResult})
            }
        }
        return out
    }
    
    func main() {
        pattern := os.Args[1]
        dirPath := os.Args[2]
    
        paths := getRecursiveFilePaths(dirPath)
    
        out := searchPaths(paths, pattern)
    
        go func(){
            wg.Wait() // waiting before closing the channel
            close(out)
        }()
        
        count := 0
        for d := range out {
            fmt.Println(count)
            d.PrettyPrint()
            count += 1
        }
    
    }
    

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-03-20
    • 1970-01-01
    • 1970-01-01
    • 2016-04-26
    相关资源
    最近更新 更多