【问题标题】:Is it possible to multiplex several channels into one?是否可以将多个通道复用为一个?
【发布时间】:2012-06-14 07:53:29
【问题描述】:

这个想法是在一个切片中拥有可变数量的通道,将通过它们接收到的每个值推送到单个通道中,并在最后一个输入通道关闭后关闭此输出通道。像这样,但对于两个以上的频道:

func multiplex(cin1, cin2, cout chan int) {
    n := 2
    for {
        select {
        case v, ok := <-cin1:
            if ok {
                cout <- v
            } else {
                n -= 1
            }

        case v, ok := <-cin2:
            if ok {
                cout <- v
            } else {
                n -= 1
            }
        }

        if n == 0 {
            close(cout)
            break
        }
    }
}

上面的代码避免了忙循环,因为没有default case,这很好(编辑:看起来“,ok”的存在使得select语句非阻塞并且循环毕竟是忙的。但是为了这个例子,把代码想象成它会阻塞)。是否也可以通过任意数量的输入通道来实现相同的功能?显然,这可以通过将切片成对地减少到单个通道来完成,但如果可能的话,我会对更简单的解决方案更感兴趣。

【问题讨论】:

    标签: concurrency go channel


    【解决方案1】:

    编辑:添加了成对归约示例代码并重新排序了部分答案。

    首选的解决方案是不回答“重组,让您没有渠道切片”。重构通常可以利用多个 goroutine 可以发送到单个通道的特性。因此,不必让每个源在单独的通道上发送,然后不得不处理从一堆通道接收,只需创建一个通道并让所有源在该通道上发送。

    Go 不提供从通道切片接收的功能。这是一个经常被问到的问题,虽然刚刚给出的解决方案是首选的,但有一些方法可以对其进行编程。我认为您在原始问题中建议的解决方案是“成对减少切片”是二进制分而治之的解决方案。只要您有将两个通道多路复用为一个的解决方案,它就可以正常工作。您的示例代码非常接近工作。

    您只是缺少一个小技巧来使您的示例代码正常工作。在减少 n 的地方,添加一行将通道变量设置为 nil。比如我把代码读了

        case v, ok := <-cin1:
            if ok {
                cout <- v
            } else {
                n--
                cin1 = nil
            }
        case v, ok := <-cin2:
            if ok {
                cout <- v
            } else {
                n--
                cin2 = nil
            }
        }
    

    这个解决方案做你想做的,而不是忙于等待。

    那么,将这个解决方案整合到一个多路复用切片的函数中的完整示例:

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func multiplex(cin []chan int, cout chan int) {
        var cin0, cin1 chan int
        switch len(cin) {
        case 2:
            cin1 = cin[1]
            fallthrough
        case 1:
            cin0 = cin[0]
        case 0:
        default:
            cin0 = make(chan int)
            cin1 = make(chan int)
            half := len(cin) / 2
            go multiplex(cin[:half], cin0)
            go multiplex(cin[half:], cin1)
        }
        for cin0 != nil || cin1 != nil {
            select {
            case v, ok := <-cin0:
                if ok {
                    cout <- v
                } else {
                    cin0 = nil
                }
            case v, ok := <-cin1:
                if ok {
                    cout <- v
                } else {
                    cin1 = nil
                }
            }
        }
        close(cout)
    }
    
    func main() {
        cin := []chan int{
            make(chan int),
            make(chan int),
            make(chan int),
        }
        cout := make(chan int)
        for i, c := range cin {
            go func(x int, cx chan int) {
                for i := 1; i <= 3; i++ {
                    time.Sleep(100 * time.Millisecond)
                    cx <- x*10 + i
                }
                close(cx)
            }(i, c)
        }
        go multiplex(cin, cout)
        for v := range cout {
            fmt.Println("main gets", v)
        }
    }
    

    【讨论】:

    • 不,不完全是。我正在寻找带有签名func multiplex(cin []chan int, cout chan int) 的函数,即一个可以在任意数量的输入通道上运行而不是硬编码为两个的函数。
    • func multiplex() 是递归的。如果您查看default 的情况,它会使用数组的一半调用自身,直到减少到两个。
    【解决方案2】:

    我相信这个 sn-p 可以满足您的需求。我已经更改了签名,因此很明显输入和输出只能用于一个方向的通信。注意添加了sync.WaitGroup,您需要某种方式让所有输入都表明它们已完成,这很容易。

    func combine(inputs []<-chan int, output chan<- int) {
      var group sync.WaitGroup
      for i := range inputs {
        group.Add(1)
        go func(input <-chan int) {
          for val := range input {
            output <- val
          }
          group.Done()
        } (inputs[i])
      }
      go func() {
        group.Wait()
        close(output)
      } ()
    }
    

    【讨论】:

    【解决方案3】:

    我使用 goroutines 制作了这个。是你想要的吗?

    package main
    
    import (
        "fmt"
    )
    
    func multiplex(cin []chan int, cout chan int) {
        n := len(cin)
        for _, ch := range cin {
            go func(src chan int) {
                for {
                    v, ok := <-src
                    if ok {
                        cout <- v
                    } else {
                        n-- // a little dangerous. Maybe use a channel to avoid missed decrements
                        if n == 0 {
                            close(cout)
                        }
                        break
                    }
                }
            }(ch)
        }
    }
    
    // a main to test the multiplex
    func main() {
        cin := make([]chan int, 3)
        cin[0] = make(chan int, 2)
        cin[1] = make(chan int, 2)
        cin[2] = make(chan int, 2)
        cout := make(chan int, 2)
        multiplex(cin, cout)
        cin[1] <- 1
        cin[0] <- 2
        cin[2] <- 3
        cin[1] <- 4
        cin[0] <- 5
        close(cin[1])
        close(cin[0])
        close(cin[2])
        for {
            v, ok := <-cout
            if ok {
                fmt.Println(v)
            } else {
                break
            }
        }
    }
    

    编辑:参考:

    http://golang.org/ref/spec#Receive_operator

    http://golang.org/ref/spec#Close

    【讨论】:

    • 文档说,如果您从带有“,ok”的通道读取值,则操作不会阻塞。然后ok 的值就是false 并且继续执行。如果这是正确的(我是 Go 新手,不太清楚),那么如果通道是空的但尚未关闭,if ok 行将评估为false 并执行else 分支。但是,如果您将 "v, ok := if 替换为 select 语句,那么它可能会起作用。得测试一下。谢谢你的回复,顺便说一句。
    • 您在哪里读到操作不会阻塞?我没有找到它,它似乎与我观察到的不符。我从文档中读到它不会阻塞 一旦通道关闭
    • 这似乎来自旧版本的规范,例如here,看看“方法表达式”之前的最后一段。在当前版本中,这段话稍作改动,并说“返回零值,因为通道是关闭和空的(假)”。这听起来像false 只有在通道被排空和关闭后才会返回,对吧?那意味着我错了。
    猜你喜欢
    • 1970-01-01
    • 2021-06-27
    • 1970-01-01
    • 2023-03-30
    • 1970-01-01
    • 2021-07-07
    • 1970-01-01
    • 1970-01-01
    • 2017-08-18
    相关资源
    最近更新 更多