【问题标题】:A channel multiplexer通道多路复用器
【发布时间】:2013-10-12 02:38:51
【问题描述】:

注意 - Go 新手。

我编写了一个多路复用器,应该将一组通道的输出合并为一个。乐于接受建设性的批评。

func Mux(channels []chan big.Int) chan big.Int {
    // Count down as each channel closes. When hits zero - close ch.
    n := len(channels)
    // The channel to output to.
    ch := make(chan big.Int, n)

    // Make one go per channel.
    for _, c := range channels {
        go func() {
            // Pump it.
            for x := range c {
                ch <- x
            }
            // It closed.
            n -= 1
            // Close output if all closed now.
            if n == 0 {
                close(ch)
            }
        }()
    }
    return ch
}

我正在测试它:

func fromTo(f, t int) chan big.Int {
    ch := make(chan big.Int)

    go func() {
        for i := f; i < t; i++ {
            fmt.Println("Feed:", i)
            ch <- *big.NewInt(int64(i))
        }
        close(ch)
    }()
    return ch
}

func testMux() {
    r := make([]chan big.Int, 10)
    for i := 0; i < 10; i++ {
        r[i] = fromTo(i*10, i*10+10)
    }
    all := Mux(r)
    // Roll them out.
    for l := range all {
        fmt.Println(l)
    }
}

但我的输出很奇怪:

Feed: 0
Feed: 10
Feed: 20
Feed: 30
Feed: 40
Feed: 50
Feed: 60
Feed: 70
Feed: 80
Feed: 90
Feed: 91
Feed: 92
Feed: 93
Feed: 94
Feed: 95
Feed: 96
Feed: 97
Feed: 98
Feed: 99
{false [90]}
{false [91]}
{false [92]}
{false [93]}
{false [94]}
{false [95]}
{false [96]}
{false [97]}
{false [98]}
{false [99]}

所以我的问题:

  • 我在 Mux 中做错了什么?
  • 为什么我只能从输出通道获得最后 10 个?
  • 为什么喂食看起来如此奇怪? (每个输入通道的第一个,最后一个通道,然后什么都没有)
  • 有更好的方法吗?

我需要所有输入通道对输出通道具有平等的权利 - 即我不能从一个通道获得所有输出,然后从下一个通道获得所有输出,等等。


对于任何感兴趣的人 - 这是修复后的最终代码以及正确(大概)使用sync.WaitGroup

import (
    "math/big"
    "sync"
)

/*
  Multiplex a number of channels into one.
*/
func Mux(channels []chan big.Int) chan big.Int {
    // Count down as each channel closes. When hits zero - close ch.
    var wg sync.WaitGroup
    wg.Add(len(channels))
    // The channel to output to.
    ch := make(chan big.Int, len(channels))

    // Make one go per channel.
    for _, c := range channels {
        go func(c <-chan big.Int) {
            // Pump it.
            for x := range c {
                ch <- x
            }
            // It closed.
            wg.Done()
        }(c)
    }
    // Close the channel when the pumping is finished.
    go func() {
        // Wait for everyone to be done.
        wg.Wait()
        // Close.
        close(ch)
    }()
    return ch
}

【问题讨论】:

    标签: go channel


    【解决方案1】:

    Mux 生成的每个 goroutines 最终都会从同一个通道中提取,因为 c 在循环的每次迭代中都会更新——它们不只是捕获 c 的值。如果像这样将通道传递给 goroutine,您将获得预期的结果:

    for _, c := range channels {
        go func(c <-chan big.Int) {
            ...
        }(c)
    }
    

    你可以测试这个修改here

    另一个可能的问题是您对n 变量的处理:如果您使用GOMAXPROCS != 1 运行,您可能有两个goroutines 试图同时更新它。 sync.WaitGroup 类型是等待 goroutine 完成的更安全的方式。

    【讨论】:

    • 谢谢 - 这完全解释了我的问题。结果是否会始终如一地为任何架构上的所有渠道提供平等的权利?
    • 你是在问每次喂ch的goroutine是否会被公平地安排?我不知道这是否已定义。如果您需要特定的结果交错,您可能需要更多的东西。
    • 我担心在某些环境中,每个频道可能会在下一个频道被查看之前耗尽。必须避免这种情况。我不需要特定的顺序,但我需要所有渠道之间的公平平衡。
    【解决方案2】:

    事后,我知道,但我写了一个包,它实现了一个类似于这个的通用 Multiplex 函数。它使用反射包中的“select”调用来确保高效且平衡的多路复用,而无需任何锁定或等待组。

    【讨论】:

      【解决方案3】:

      James Hentridge 答案的基础上,使用range 语句时处理重新分配问题的惯用方法是将局部变量分配给风险值:

      for _, c := range channels {
          c := c
          go func() {
          ...
          }()
      }
      

      【讨论】:

      • 这不是处理此问题的惯用方式。请使用 James Hentridge 的回答。
      猜你喜欢
      • 2015-10-09
      • 1970-01-01
      • 1970-01-01
      • 2014-07-05
      • 2016-04-30
      • 2016-04-17
      • 1970-01-01
      • 2014-07-27
      • 1970-01-01
      相关资源
      最近更新 更多