【问题标题】:GoLang: Decompress bz2 in on goroutine, consume in other goroutineGoLang:在goroutine中解压bz2,在其他goroutine中消费
【发布时间】:2016-03-25 21:53:36
【问题描述】:

我是一名刚毕业的 SWE,正在学习 Go(并且喜欢它)。

我正在为 Wikipedia 转储文件构建一个解析器 - 基本上是一个巨大的 bzip2 压缩 XML 文件(约 50GB 未压缩)。

我想同时进行流解压和解析,这听起来很简单。为了减压,我这样做:

inputFilePath := flag.Arg(0) inputReader := bzip2.NewReader(inputFile)

然后将阅读器传递给XML解析器:

decoder := xml.NewDecoder(inputFile)

但是,由于解压缩和解析都是昂贵的操作,我希望它们在单独的 Go 例程上运行以利用额外的内核。我将如何在 Go 中执行此操作?

我唯一能想到的就是将文件包装在一个 chan []byte 中,并实现 io.Reader 接口,但我认为可能有一种构建方式(和更简洁)的方式来实现它。

有人做过这样的事吗?

谢谢! 曼努埃尔

【问题讨论】:

    标签: xml go concurrency goroutine bzip2


    【解决方案1】:

    你可以使用io.Pipe,然后使用io.Copy将解压后的数据推入管道,在另一个goroutine中读取:

    package main
    
    import (
        "bytes"
        "encoding/json"
        "fmt"
        "io"
        "sync"
    )
    
    func main() {
    
        rawJson := []byte(`{
                "Foo": {
                    "Bar": "Baz"
                }
            }`)
    
        bzip2Reader := bytes.NewReader(rawJson) // this stands in for the bzip2.NewReader
    
        var wg sync.WaitGroup
        wg.Add(2)
    
        r, w := io.Pipe()
    
        go func() {
            // write everything into the pipe. Decompression happens in this goroutine.
            io.Copy(w, bzip2Reader)
            w.Close()
            wg.Done()
        }()
    
        decoder := json.NewDecoder(r)
    
        go func() {
            for {
                t, err := decoder.Token()
                if err != nil {
                    break
                }
                fmt.Println(t)
            }
            wg.Done()
        }()
    
        wg.Wait()
    }
    

    http://play.golang.org/p/fXLnfnaWYA

    【讨论】:

    • 这正是我所需要的,谢谢!不幸的是,stardard lib bzip2 解压缩器的性能似乎不是很好,所以它仍然是限制因素。我可能会切换到这个压缩器:godoc.org/github.com/dsnet/compress/bzip2 但是,它仍然比 pbzip2 之类的慢 33%。
    • @ManuelMenzella,你最终得到了多少加速?我喜欢这段代码的外观——它似乎应该可以工作,但在我的测试中,它只比做所有单线程的事情快一点(67 秒对 1M 记录上的 72 秒)。知道我可能做错了什么吗,@user1431317?
    • 也许它仍然受限于 bzip2 解压缩可以多快地提供数据,并且 xml 解码并没有占用那么多 cpu 功率。管道可能会增加一些开销,尽管 io.Copy 确实在一端或两端是 io.Reader/io.Writer 时进行了优化。它可能分配了很多小的临时缓冲区,这会造成太多垃圾。也许缓冲的读者或作家会有所帮助。您应该分析您的应用程序(cpu 和内存配置文件 - 内存配置文件可以帮助您找到许多不必要的分配)。
    • bzip2 解压器和解析器所用的时间大致相同——我通过单独运行它们进行了测试。我也试过缓冲所有可以想象的东西,但没有帮助。好吧,除了 io.Pipe 之外的所有内容,我怀疑这 可能 是问题所在。我已经发布了一个单独的问题:stackoverflow.com/questions/45089248/…
    • 缓冲 PipeReader 和 PipeWriter 也无济于事。嗯……你(或其他人)有没有试过这个并发现比单线程版本有显着的加速?
    【解决方案2】:

    一个简单的解决方案是使用我之前创建的预读包:https://github.com/klauspost/readahead

    inputReader := bzip2.NewReader(inputFile)
    ra := readahead.NewReader(input)
    defer ra.Close()
    

    然后将阅读器传递给XML解析器:

    decoder := xml.NewDecoder(ra)
    

    使用默认设置,它将在 4 个缓冲区中提前解码多达 4MB。

    【讨论】:

      猜你喜欢
      • 2020-03-18
      • 2015-12-07
      • 2022-01-16
      • 1970-01-01
      • 1970-01-01
      • 2021-08-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多