【问题标题】:How to multiplex channel output in go如何在go中复用通道输出
【发布时间】:2017-12-01 22:57:01
【问题描述】:

我正在寻找在 go 中多路复用某些通道输出的解决方案。

我有一个数据源,它是从 io.Reader 读取的数据,我将其发送到单个通道。另一方面,我有一个从通道读取的 websocket 请求处理程序。现在碰巧有两个客户端创建了一个 websocket 连接,都从同一个通道读取,但每个客户端都只获取部分消息。

代码示例(简化):

func (b *Bootloader) ReadLog() (<-chan []byte, error) {
    if b.logCh != nil {
        logrus.Warn("ReadLog called while channel already exists!")
        return b.logCh, nil // This is where we get problems
    }

    b.logCh = make(chan []byte, 0)

    go func() {
        buf := make([]byte, 1024)
        for {
            n, err := b.p.Read(buf)

            if err == nil {
                msg := make([]byte, n)
                copy(msg, buf[:n])
                b.logCh <- msg
            } else {
                break
            }
        }

        close(b.logCh)
        b.logCh = nil
    }()

    return b.logCh, nil
}

现在当ReadLog()被调用两次时,第二次调用只是返回了第一次调用中创建的通道,这就导致了上面解释的问题。

问题是:如何正确复用?

关心发送或接收站点上的多路复用是否更好/更容易/更符合理念?

我应该对接收者隐藏频道并使用回调吗?

我现在有点卡住了。欢迎任何提示。

【问题讨论】:

  • 你追求的是所谓的“扇出”,你可以开始here
  • 不,扇出是提问者得到但不想要的:多个客户端从同一个频道读取。

标签: go channel multiplexing


【解决方案1】:

多路复用非常简单:制作一个要多路复用的通道切片,启动一个从原始通道读取的 goroutine 并将每个消息复制到切片中的每个通道:

// Really this should be in Bootloader but this is just an example
var consumers []chan []byte

func (b *Bootloader) multiplex() {
    // We'll use a sync.once to make sure we don't start a bunch of these.
    sync.Once(func(){ 
        go func() {
            // Every time a message comes over the channel...
            for v := range b.logCh {
                // Loop over the consumers...
                for _,cons := range consumers {
                    // Send each one the message
                    cons <- v
                }
            }
        }()
    })
}

【讨论】:

猜你喜欢
  • 2015-02-20
  • 1970-01-01
  • 1970-01-01
  • 2019-11-14
  • 2016-08-02
  • 2019-05-14
  • 2013-01-28
  • 2016-08-09
  • 2018-05-29
相关资源
最近更新 更多