【问题标题】:When trying to implement an io.Reader that uses channels I get a fatal error尝试实现使用通道的 io.Reader 时出现致命错误
【发布时间】:2020-10-10 10:41:15
【问题描述】:

对于这个非常简单的问题,我很抱歉,只是在理解我们如何实施 io.Reader 时遇到了一些麻烦。

我的最终用例是我正在消费一个可以随时发送数据的无限流。为了模拟这一点,我创建了一个实现io.Readerio.Writeremitter

我同时使用bufio.Scanner 收听阅读器,同时从 main 向任何监听器发送值。

游乐场: https://goplay.space/#eJfe0HyfYrL


func main() {
    wg := &sync.WaitGroup{}
    wg.Add(1)

    data := newEmitter()

    go func() {
        scanner := bufio.NewScanner(data)
        for scanner.Scan() {
            fmt.Println(scanner.Text())
        }
        wg.Done()
    }()

    data.WriteString("foo")
    time.Sleep(2 * time.Second)
    data.WriteString("bar")

    wg.Wait()
}

我的emitter

type emitter struct {
    ch chan []byte
}

func (em *emitter) Read(b []byte) (int, error) {
    n := copy(b, <-em.ch)
    return n, nil
}

func (em *emitter) Write(b []byte) (int, error) {
    em.ch <- b
    return len(b), nil
}

func (em *emitter) WriteString(s string) (int, error) {
    return em.Write([]byte(s))
}

func newEmitter() *emitter {
    return &emitter{
        ch: make(chan []byte),
    }
}

我收到以下错误

fatal error: all goroutines are asleep - deadlock!

【问题讨论】:

  • 哦,太好了! io.Pipe 正是我想要做的

标签: go


【解决方案1】:

emitter.Read 中的 goroutine 在从&lt;-em.ch 接收时阻塞。 wg.Wait 上的主要 goroutine 块。死锁!

通过向发射器添加Close() 方法来关闭em.ch 进行修复。当通道关闭时,在emitter.Read 中返回io.EOFClose发送数据完成后的发射器。

emitter 还存在其他问题。不要修复emitter,而是使用io.Pipe 将作者连接到阅读器:

wg := &sync.WaitGroup{}
wg.Add(1)

pr, pw := io.Pipe()

go func() {
    scanner := bufio.NewScanner(pr)
    for scanner.Scan() {
        fmt.Println(scanner.Text())
    }
    wg.Done()
}()

io.WriteString(pw, "foo\n")
time.Sleep(2 * time.Second)
io.WriteString(pw, "bar\n")
pw.Close()

wg.Wait()

Run it on the playground.

【讨论】:

    猜你喜欢
    • 2016-06-14
    • 1970-01-01
    • 2018-08-29
    • 1970-01-01
    • 2013-06-16
    • 1970-01-01
    • 2023-03-14
    • 2012-11-01
    • 1970-01-01
    相关资源
    最近更新 更多