【问题标题】:Fan-in channels to single channel将频道扇入到单频道
【发布时间】:2018-12-16 17:59:25
【问题描述】:

我有多个通道 c1、c2、c3、c4 ...,如何将这些通道中的所有数据收集到一个通道中? 我的代码:

package main

import (
    "fmt"
    "sync"
)

func putToChannel(c chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 1; i < 6; i++ {
        c <- i
    }
}

func main() {
    c := make(chan int, 15)
    c1 := make(chan int, 5)
    c2 := make(chan int, 5)
    c3 := make(chan int, 5)
    go func(){c <- <-c1}()
    go func(){c <- <-c2}()
    go func(){c <- <-c3}()
    wg := new(sync.WaitGroup)
    wg.Add(3)
    go putToChannel(c1, wg)
    go putToChannel(c2, wg)
    go putToChannel(c3, wg)
    wg.Wait()
    close(c)
    for i := range c {
        fmt.Println("Receive:", i)
    }

    fmt.Println("Finish")
}

我想将所有数据从 c1, c2 ... 组合到 c 但它不起作用

【问题讨论】:

  • 为什么不直接使用共享频道?

标签: go channel


【解决方案1】:

一个干净的方法

假设我们有一个这样的无限发送者:

func msgSender(msg string) <-chan string{
    ch := make(chan string)
    go func() {
        for {
            ch <- msg
            time.Sleep(300*time.Millisecond)
        }
    }()
    return ch
}

注意msgSender() 返回一个只接收的频道

我们希望在一个频道中扇入两个(或更多)发件人:

func fanIn(){
    receiveOnlyCh1:= msgSender("msg1")
    receiveOnlyCh2:= msgSender("msg2")

    fanInCh := make(chan string)
    go func(){
        for{
            select {
            case fromSender1 := <-receiveOnlyCh1:
                fanInCh <- fromSender1
            case fromSender2 := <-receiveOnlyCh2:
                fanInCh <- fromSender2
            }
        }
    }()

    go func(){
        for {
            fmt.Println(<-fanInCh)
        }
    }()
}

输出:

msg1
msg2
msg2
msg1
msg2
msg1
msg2
msg1

https://play.golang.org/p/BhScg6nqphq

【讨论】:

    【解决方案2】:

    This article 有一篇很好的文章,介绍了如何“扇入”频道,包括停顿。

    这些行有问题:

    go func(){c <- <-c1}()
    go func(){c <- <-c2}()
    go func(){c <- <-c3}()
    

    其中每一个都将从cx 频道接收一个 值,并将该值发送到c

    你需要一个看起来像这样的方法;

    func merge(cs ...<-chan int) <-chan int {
        var wg sync.WaitGroup
        out := make(chan int)
    
        // Start an output goroutine for each input channel in cs.  output
        // copies values from c to out until c is closed, then calls wg.Done.
        output := func(c <-chan int) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }
        wg.Add(len(cs))
        for _, c := range cs {
            go output(c)
        }
    
        // Start a goroutine to close out once all the output goroutines are
        // done.  This must start after the wg.Add call.
        go func() {
            wg.Wait()
            close(out)
        }()
        return out
    }
    

    此方法依赖于这样一个事实,即当没有更多值要发送时,正在传递给 merge 的通道 cs... 将关闭。

    这意味着您还需要更新您的 putToChannel 方法

    func putToChannel(c chan<- int, wg *sync.WaitGroup) {
        defer wg.Done()
        defer close(c)
        for i := 1; i < 6; i++ {
            c <- i
        }
    }
    

    最后一点值得注意的是,总的来说;尝试将创建和发送到通道的函数和关闭通道的函数封装为相同的函数。这意味着您永远不会尝试在封闭的频道上发送。

    代替:

    c1 := make(chan int, 5)
    go putToChannel(c1, wg)
    

    你可以的;

    func generator() (<-chan int) {
        c := make(chan int, 5)
        go func() {
            for i := 1; i < 6; i++ {
                 c <- i
            }
            close(c)
        }() 
        return c
    }
    

    您的主要方法将类似于:

    func main() {
        var cs []<-chan int
    
        cs = append(cs, generator())
        cs = append(cs, generator())
        cs = append(cs, generator())
    
        c := merge(cs...)
        for v := range c {
            fmt.Println(v)
        }
    }
    

    【讨论】:

      【解决方案3】:

      我已经稍微修改了你的代码,如下所示。

      基本上,在这个例子中,需要采取三个步骤:

      1. 将值放入 c1c2 和 ,c3 中,完成后不要忘记关闭它们。
      2. 使用for-range 遍历需要合并到c 的每个通道,以将值放入c。在for-range 循环之后,您需要输入wg.Done(),以便在完成每个通道的迭代后向goroutine 发出信号以最终关闭c。如果您没有关闭要合并到c 的频道之一,您将收到all goroutines are asleep - deadlock 错误。
      3. 一切完成后关闭c频道

      这里是修改后的代码:

      package main
      
      import (
          "fmt"
          "sync"
      )
      
      func putToChannel(c chan<- int) {
          for i := 1; i < 6; i++ {
              c <- i
          }
          //close the channel after putting values in
          close(c)
      }
      
      func main() {
          c := make(chan int, 15)
          c1 := make(chan int, 5)
          c2 := make(chan int, 5)
          c3 := make(chan int, 5)
      
          output := func(ch <-chan int, wg *sync.WaitGroup) {
              //you need to iterate over the channel
              for n := range ch {
                  c <- n
              }
              wg.Done()
          }
      
          wg := new(sync.WaitGroup)
          wg.Add(3)
          go putToChannel(c1)
          go putToChannel(c2)
          go putToChannel(c3)
          go output(c1, wg)
          go output(c2, wg)
          go output(c3, wg)
      
          go func() {
              wg.Wait()
              close(c)
          }()
          for i := range c {
              fmt.Println("Receive:", i)
          }
      
          fmt.Println("Finish")
      }
      

      您可以找到更多信息here

      【讨论】:

        【解决方案4】:

        你可以这样做

        package main
        
        import (
            "fmt"
            "sync"
        )
        
        func putToChannel(c chan<- int, wg *sync.WaitGroup) {
            defer wg.Done()
            for i := 1; i < 6; i++ {
                c <- i
            }
        }
        
        func main() {
            c := make(chan int, 15)
            c1 := make(chan int, 5)
            c2 := make(chan int, 5)
            c3 := make(chan int, 5)
            send := func(c1 chan int, c2 chan int) {
                for {
                    value := <-c2
                    c1 <- value
                }
            }
            go send(c, c1)
            go send(c, c2)
            go send(c, c3)
            wg := new(sync.WaitGroup)
            wg.Add(3)
            go putToChannel(c1, wg)
            go putToChannel(c2, wg)
            go putToChannel(c3, wg)
            wg.Wait()
            for i := 0; i < 15; i++ {
                fmt.Println("Receive:", <-c)
            }
        
            fmt.Println("Finish")
        }
        

        哪些输出:

        Receive: 1
        Receive: 2
        Receive: 3
        Receive: 4
        Receive: 5
        Receive: 1
        Receive: 2
        Receive: 3
        Receive: 4
        Receive: 5
        Receive: 1
        Receive: 2
        Receive: 3
        Receive: 4
        Receive: 5
        Finish
        

        【讨论】:

          【解决方案5】:

          涉及reflect 包以提供可重用功能的解决方案。

          package main
          
          import (
              "fmt"
              "reflect"
              "sync"
          )
          
          func putToChannel(c chan<- int) {
              for i := 1; i < 6; i++ {
                  c <- i
              }
              close(c)
          }
          
          func main() {
          
              var cs []interface{}
              for i := 0; i < 3; i++ {
                  c := make(chan int, 5)
                  cs = append(cs, c)
                  go putToChannel(c)
              }
          
              c := fanin(cs...).(chan int)
              for i := range c {
                  fmt.Println("Receive:", i)
              }
              fmt.Println("Finish")
          
          }
          

          https://play.golang.org/p/b_vbLzpCDFo

          它看起来略微简化,因为它不需要明显使用 WaitGroup。当工作完成时,每个工人都有简单的责任close 输出通道。 fanin 会在之前的前提条件下解决问题。

          fanin 函数需要一个相同通道类型的空接口切片,它会查找第一项来确定输出通道的类型。然后它创建输出通道,为每个输入通道生成一个 goroutine,并将每个输入通道的项目转发到输出通道。 当所有输入通道也都关闭时,需要使用 WaitGroup 来关闭输出通道。 请注意,如果输入通道切片为空,则会出现恐慌。

          看起来像这样

          // fanin fanins all input channels into a single channel.
          // fanin is func(input ...chan T) (chan T)
          func fanin(inputs ...interface{}) interface{} {
              // note: because we take in variadic of interface, we cannot receive the trailing error channel...
              if len(inputs) < 1 {
                  panic("no channels to fanin")
              }
              rinputs := []reflect.Value{}
              for _, input := range inputs {
                  rinputs = append(rinputs, reflect.ValueOf(input))
              }
              out := reflect.MakeChan(reflect.ChanOf(reflect.BothDir, rinputs[0].Type().Elem()), 0)
              var wg sync.WaitGroup
              for i := 0; i < len(rinputs); i++ {
                  rinput := rinputs[i]
                  wg.Add(1)
                  go func() {
                      defer wg.Done()
                      for {
                          x, ok := rinput.Recv()
                          if !ok {
                              break
                          }
                          out.Send(x)
                      }
                  }()
              }
              go func() {
                  wg.Wait()
                  out.Close()
              }()
              return out.Convert(rinputs[0].Type()).Interface()
          }
          

          如果你一直把事情弄干,你最终可能会得到一个fanout 函数,如下所示。

          https://play.golang.org/p/0Zi9h4XPbbV

          package main
          
          import (
              "fmt"
              "reflect"
              "sync"
          )
          
          func putToChannel() chan int {
              c := make(chan int, 5)
              go func() {
                  for i := 1; i < 6; i++ {
                      c <- i
                  }
                  close(c)
              }()
              return c
          }
          
          func main() {
              for i := range fanout(3, putToChannel).(chan int) {
                  fmt.Println("Receive:", i)
              }
          
              fmt.Println("Finish")
          }
          

          通过这样的实现,fanout 函数看起来像

          // fanout spawns n instances of workers.
          // worker is a func() chan A
          // it fanins all output channels and return a chan A
          // fanout is func(n int, worker func() chan A) chan A
          func fanout(n int, worker interface{}) interface{} {
              rworker := reflect.ValueOf(worker)
              if rworker.Kind() != reflect.Func {
                  panic("not a func")
              }
              var outchans []interface{}
              var in []reflect.Value
              for i := 0; i < n; i++ {
                  res := rworker.Call(in)
                  outchans = append(outchans, res[0].Interface())
              }
              return fanin(outchans...)
          }
          

          【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2015-01-25
          • 2019-02-19
          • 1970-01-01
          • 2016-06-07
          • 2013-12-25
          • 2017-03-18
          • 1970-01-01
          • 2021-10-12
          相关资源
          最近更新 更多