涉及reflect 包以提供可重用功能的解决方案。
package main
import (
"fmt"
"reflect"
"sync"
)
func putToChannel(c chan<- int) {
for i := 1; i < 6; i++ {
c <- i
}
close(c)
}
func main() {
var cs []interface{}
for i := 0; i < 3; i++ {
c := make(chan int, 5)
cs = append(cs, c)
go putToChannel(c)
}
c := fanin(cs...).(chan int)
for i := range c {
fmt.Println("Receive:", i)
}
fmt.Println("Finish")
}
https://play.golang.org/p/b_vbLzpCDFo
它看起来略微简化,因为它不需要明显使用 WaitGroup。当工作完成时,每个工人都有简单的责任close 输出通道。 fanin 会在之前的前提条件下解决问题。
fanin 函数需要一个相同通道类型的空接口切片,它会查找第一项来确定输出通道的类型。然后它创建输出通道,为每个输入通道生成一个 goroutine,并将每个输入通道的项目转发到输出通道。
当所有输入通道也都关闭时,需要使用 WaitGroup 来关闭输出通道。
请注意,如果输入通道切片为空,则会出现恐慌。
看起来像这样
// fanin fanins all input channels into a single channel.
// fanin is func(input ...chan T) (chan T)
func fanin(inputs ...interface{}) interface{} {
// note: because we take in variadic of interface, we cannot receive the trailing error channel...
if len(inputs) < 1 {
panic("no channels to fanin")
}
rinputs := []reflect.Value{}
for _, input := range inputs {
rinputs = append(rinputs, reflect.ValueOf(input))
}
out := reflect.MakeChan(reflect.ChanOf(reflect.BothDir, rinputs[0].Type().Elem()), 0)
var wg sync.WaitGroup
for i := 0; i < len(rinputs); i++ {
rinput := rinputs[i]
wg.Add(1)
go func() {
defer wg.Done()
for {
x, ok := rinput.Recv()
if !ok {
break
}
out.Send(x)
}
}()
}
go func() {
wg.Wait()
out.Close()
}()
return out.Convert(rinputs[0].Type()).Interface()
}
如果你一直把事情弄干,你最终可能会得到一个fanout 函数,如下所示。
https://play.golang.org/p/0Zi9h4XPbbV
package main
import (
"fmt"
"reflect"
"sync"
)
func putToChannel() chan int {
c := make(chan int, 5)
go func() {
for i := 1; i < 6; i++ {
c <- i
}
close(c)
}()
return c
}
func main() {
for i := range fanout(3, putToChannel).(chan int) {
fmt.Println("Receive:", i)
}
fmt.Println("Finish")
}
通过这样的实现,fanout 函数看起来像
// fanout spawns n instances of workers.
// worker is a func() chan A
// it fanins all output channels and return a chan A
// fanout is func(n int, worker func() chan A) chan A
func fanout(n int, worker interface{}) interface{} {
rworker := reflect.ValueOf(worker)
if rworker.Kind() != reflect.Func {
panic("not a func")
}
var outchans []interface{}
var in []reflect.Value
for i := 0; i < n; i++ {
res := rworker.Call(in)
outchans = append(outchans, res[0].Interface())
}
return fanin(outchans...)
}