【问题标题】:how to listen to N channels? (dynamic select statement)怎么听N个频道? (动态选择语句)
【发布时间】:2013-11-28 07:55:11
【问题描述】:

要开始执行两个 goroutine 的无限循环,我可以使用以下代码:

在收到消息后,它将启动一个新的 goroutine 并永远继续下去。

c1 := make(chan string)
c2 := make(chan string)

go DoStuff(c1, 5)
go DoStuff(c2, 2)

for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}

我现在希望 N 个 goroutine 具有相同的行为,但在这种情况下 select 语句会如何?

这是我开始的代码位,但我对如何编写 select 语句感到困惑

numChans := 2

//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}

for i:=0;i<numChans;i++{
    tmp := make(chan string);
    chans = append(chans, tmp);
    go DoStuff(tmp, i + 1)

//How shall the select statment be coded for this case?  
for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}

【问题讨论】:

标签: go


【解决方案1】:

您可以使用reflect 包中的Select 函数来执行此操作:

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Select 执行由案例列表描述的选择操作。喜欢 Go select 语句,它会阻塞,直到至少有一种情况可以 继续,做出统一的伪随机选择,然后执行 案子。它返回所选案例的索引,如果该案例是 接收操作,接收到的值和一个布尔值,指示是否 该值对应于通道上的发送(而不是零 由于通道关闭而收到的值)。

您传入一个 SelectCase 结构数组,这些结构标识要选择的通道、操作的方向以及在发送操作的情况下要发送的值。

所以你可以这样做:

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()

您可以在此处尝试更充实的示例:http://play.golang.org/p/8zwvSk4kjx

【讨论】:

  • 这种选择的案例数量是否有实际限制?如果超过它,性能会受到严重影响?
  • 也许是我的无能,但是当您通过通道发送和接收复杂结构时,我发现这种模式真的很难使用。正如 Tim Allclair 所说,通过一个共享的“聚合”频道对我来说要容易得多。
【解决方案2】:

您可以通过将每个通道包装在一个 goroutine 中来实现这一点,该 goroutine 将消息“转发”到共享的“聚合”通道。例如:

agg := make(chan string)
for _, ch := range chans {
  go func(c chan string) {
    for msg := range c {
      agg <- msg
    }
  }(ch)
}

select {
case msg <- agg:
    fmt.Println("received ", msg)
}

如果您需要知道消息来自哪个通道,您可以在将其转发到聚合通道之前将其包装在一个带有任何额外信息的结构中。

在我的(有限的)测试中,这种方法的性能大大优于使用反射包:

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect         1    5265109013 ns/op
BenchmarkGoSelect             20      81911344 ns/op
ok      command-line-arguments  9.463s

基准代码here

【讨论】:

  • 您的基准代码不正确,您需要在基准中使用to loop over b.N。否则结果(在您的输出中除以b.N、1 和 2000000000)将完全没有意义。
  • @DaveC 谢谢!结论没有改变,但结果更加理智。
  • 确实,我对您的基准代码进行了快速破解以获得some actual numbers。这个基准测试可能仍然缺少/错误的东西,但更复杂的反射代码唯一要做的就是设置更快(使用 GOMAXPROCS=1),因为它不需要一堆 goroutine。在其他所有情况下,一个简单的 goroutine 合并通道都会破坏反射解决方案(大约 2 个数量级)。
  • 一个重要的缺点(与 reflect.Select 方法相比)是执行合并缓冲区的 goroutine 在每个被合并的通道上至少有一个值。通常这不是问题,但在某些特定的应用程序中可能会破坏交易:(。
  • 缓冲的合并通道使问题变得更糟。问题是只有反射解决方案才能具有完全无缓冲的语义。我已经继续发布了我正在试验的测试代码,作为单独的答案(希望)澄清我想说的话。
【解决方案3】:

为了在以前的答案上扩展一些 cmets 并提供更清晰的比较,这里提供了迄今为止提出的两种方法的示例,其中给出了相同的输入、要读取的通道切片和调用每个值的函数,这也需要知道值来自哪个渠道。

这些方法之间存在三个主要区别:

  • 复杂性。尽管部分原因可能是读者偏好,但我发现频道方法更惯用、更直接、更易读。

  • 性能。在我的 Xeon amd64 系统上,goroutines+channels out 执行反射解决方案大约两个数量级(一般来说,Go 中的反射通常较慢,只应在绝对需要时使用)。当然,如果函数处理结果或将值写入输入通道时有任何明显的延迟,那么这种性能差异很容易变得微不足道。

  • 阻塞/缓冲语义。这一点的重要性取决于用例。大多数情况下,这无关紧要,或者 goroutine 合并解决方案中的轻微额外缓冲可能有助于吞吐量。但是,如果希望只有一个写入器被解除阻塞并且它的值在任何其他写入器被解除阻塞之前完全处理,那么这只能通过反射解决方案来实现。

注意,如果不需要发送通道的“id”或者源通道永远不会关闭,这两种方法都可以简化。

Goroutine 合并通道:

// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []<-chan string, fn func(int, string)) {
    // Setup
    type item struct {
        int    // index of which channel this came from
        string // the actual string item
    }
    merged := make(chan item)
    var wg sync.WaitGroup
    wg.Add(len(chans))
    for i, c := range chans {
        go func(i int, c <-chan string) {
            // Reads and buffers a single item from `c` before
            // we even know if we can write to `merged`.
            //
            // Go doesn't provide a way to do something like:
            //     merged <- (<-c)
            // atomically, where we delay the read from `c`
            // until we can write to `merged`. The read from
            // `c` will always happen first (blocking as
            // required) and then we block on `merged` (with
            // either the above or the below syntax making
            // no difference).
            for s := range c {
                merged <- item{i, s}
            }
            // If/when this input channel is closed we just stop
            // writing to the merged channel and via the WaitGroup
            // let it be known there is one fewer channel active.
            wg.Done()
        }(i, c)
    }
    // One extra goroutine to watch for all the merging goroutines to
    // be finished and then close the merged channel.
    go func() {
        wg.Wait()
        close(merged)
    }()

    // "select-like" loop
    for i := range merged {
        // Process each value
        fn(i.int, i.string)
    }
}

反射选择:

// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value "in-flight" (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []<-chan string, fn func(int, string)) {
    // Setup
    cases := make([]reflect.SelectCase, len(chans))
    // `ids` maps the index within cases to the original `chans` index.
    ids := make([]int, len(chans))
    for i, c := range chans {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(c),
        }
        ids[i] = i
    }

    // Select loop
    for len(cases) > 0 {
        // A difference here from the merging goroutines is
        // that `v` is the only value "in-flight" that any of
        // the workers have sent. All other workers are blocked
        // trying to send the single value they have calculated
        // where-as the goroutine version reads/buffers a single
        // extra value from each worker.
        i, v, ok := reflect.Select(cases)
        if !ok {
            // Channel cases[i] has been closed, remove it
            // from our slice of cases and update our ids
            // mapping as well.
            cases = append(cases[:i], cases[i+1:]...)
            ids = append(ids[:i], ids[i+1:]...)
            continue
        }

        // Process each value
        fn(ids[i], v.String())
    }
}

[完整代码on the Go playground.]

【讨论】:

  • 另外值得注意的是,goroutines+channels 解决方案不能做selectreflect.Select 所做的一切。 goroutines 将继续旋转,直到它们消耗掉通道中的所有内容,因此没有明确的方法可以让Process1 提前退出。如果您有多个阅读器,也可能会出现问题,因为 goroutine 会从每个通道中缓冲一个项目,而 select 不会发生这种情况。
  • @JamesHenstridge,您关于停止的第一个说明是不正确的。您将安排停止 Process1 的方式与安排停止 Process2 的方式完全相同;例如通过添加一个“停止”通道,该通道在 goroutine 应该停止时关闭。 Process1 需要在 for 循环中使用两个案例 select,而不是当前使用的更简单的 for range 循环。 Process2 需要在cases 中添加另一个案例并特殊处理i 的值。
  • 这仍然不能解决您正在从不会在提前停止的情况下使用的通道中读取值的问题。
【解决方案4】:

可能更简单的选项:

为什么不将一个通道作为参数传递给在单独的 goroutine 上运行的函数,然后在消费者 goroutine 中监听通道,而不是拥有一组通道?

这允许您在侦听器中仅选择一个通道,进行简单的选择,并避免创建新的 goroutine 来聚合来自多个通道的消息?

【讨论】:

    【解决方案5】:

    我们实际上对这个主题进行了一些研究,并找到了最佳解决方案。我们使用了一段时间reflect.Select,它是解决问题的好方法。它比每个通道的 goroutine 轻得多,并且操作简单。但不幸的是,它并没有真正支持大量的频道,我们就是这样,所以我们发现了一些有趣的东西并写了一篇关于它的博客文章:https://cyolo.io/blog/how-we-enabled-dynamic-channel-selection-at-scale-in-go/

    我将总结那里写的内容: 我们为最多 32 次幂的每个结果的每个结果静态创建了一批 select..case 语句,以及一个路由到不同案例并通过聚合通道聚合结果的函数。

    此类批次的示例:

    func select4(ctx context.Context, chanz []chan interface{}, res chan *r, r *r, i int) {
        select {
        case r.v, r.ok = <-chanz[0]:
            r.i = i + 0
            res <- r
        case r.v, r.ok = <-chanz[1]:
            r.i = i + 1
            res <- r
        case r.v, r.ok = <-chanz[2]:
            r.i = i + 2
            res <- r
        case r.v, r.ok = <-chanz[3]:
            r.i = i + 3
            res <- r
        case <-ctx.Done():
            break
        }
    }
    

    以及使用这些select..case 批次从任意数量的通道聚合第一个结果的逻辑:

        for i < len(channels) {
            l = len(channels) - i
            switch {
            case l > 31 && maxBatchSize >= 32:
                go select32(ctx, channels[i:i+32], agg, rPool.Get().(*r), i)
                i += 32
            case l > 15 && maxBatchSize >= 16:
                go select16(ctx, channels[i:i+16], agg, rPool.Get().(*r), i)
                i += 16
            case l > 7 && maxBatchSize >= 8:
                go select8(ctx, channels[i:i+8], agg, rPool.Get().(*r), i)
                i += 8
            case l > 3 && maxBatchSize >= 4:
                go select4(ctx, channels[i:i+4], agg, rPool.Get().(*r), i)
                i += 4
            case l > 1 && maxBatchSize >= 2:
                go select2(ctx, channels[i:i+2], agg, rPool.Get().(*r), i)
                i += 2
            case l > 0:
                go select1(ctx, channels[i], agg, rPool.Get().(*r), i)
                i += 1
            }
        }
    

    【讨论】:

    • maxBatchSize 如何设置? (另外,你有一个错字:maxBachSize。我会修正它,但编辑队列已满,而且在博客文章中仍然是错误的。)
    • maxBatchSize 如果您想根据您对批次性质的一些了解进行优化,可以将其设置为配置。当我想测试哪个最大批量大小是最优的时,我提出了这种改进。我希望你能从我分享的博客文章中理解为什么。您可以在您的实现中删除它并决定您喜欢的最大批次。我们最终选择了 64,因为它在数千个频道中表现最好,这就是我们的情况。顺便说一句,感谢您指出错字!我已修复它,并将在博客文章中进行修复。
    【解决方案6】:

    假设有人发送事件,为什么这种方法行不通?

    func main() {
        numChans := 2
        var chans = []chan string{}
    
        for i := 0; i < numChans; i++ {
            tmp := make(chan string)
            chans = append(chans, tmp)
        }
    
        for true {
            for i, c := range chans {
                select {
                case x = <-c:
                    fmt.Printf("received %d \n", i)
                    go DoShit(x, i)
                default: continue
                }
            }
        }
    }
    

    【讨论】:

    • 这是一个自旋循环。在等待输入通道有值时,这会消耗所有可用的 CPU。 select 在多个通道上的全部意义(没有default 子句)是它有效地等待直到至少一个准备好而不旋转。
    猜你喜欢
    • 2013-11-20
    • 2013-12-27
    • 2021-09-28
    • 2014-11-27
    • 2021-04-03
    • 2013-05-29
    • 2020-09-09
    • 2017-12-29
    • 2017-09-25
    相关资源
    最近更新 更多