【问题标题】:How to collect values from N goroutines executed in a specific order?如何从按特定顺序执行的 N 个 goroutine 中收集值?
【发布时间】:2016-10-17 19:15:27
【问题描述】:

下面是 Stuff 类型的结构。它有三个整数。一个Number,它的Double 和它的Power。假设计算给定整数列表的 double 和 power 是一项昂贵的计算。

type Stuff struct {
    Number int
    Double int
    Power  int
}

func main() {
    nums := []int{2, 3, 4} // given numbers
    stuff := []Stuff{}     // struct of stuff with transformed ints

    double := make(chan int)
    power := make(chan int)

    for _, i := range nums {
        go doubleNumber(i, double)
        go powerNumber(i, power)
    }

    // How do I get the values back in the right order?

    fmt.Println(stuff)
}

func doubleNumber(i int, c chan int) {
    c <- i + i
}

func powerNumber(i int, c chan int) {
    c <- i * i
}

fmt.Println(stuff) 的结果应该与初始化内容相同:

stuff := []Stuff{
    {Number: 2, Double: 4, Power: 4}
    {Number: 3, Double: 6, Power: 9}
    {Number: 4, Double: 8, Power: 16}
}

我知道我可以使用&lt;- double&lt;- power 从通道中收集值,但我不知道什么双数/幂属于什么数字。

【问题讨论】:

    标签: go channel goroutine


    【解决方案1】:

    Goroutines 并发、独立地运行,因此如果没有显式同步,您无法预测执行和完成顺序。因此,您不能将返回的数字与输入的数字配对。

    您可以返回更多数据(例如,输入数字和输出,例如包装在结构中),或者将指针传递给工作函数(作为新的 goroutines 启动),例如*Stuff 并让 goroutine 在 Stuff 本身中填充计算数据。

    返回更多数据

    我将使用chan Pair 的频道类型,其中Pair 是:

    type Pair struct{ Number, Result int }
    

    所以计算将如下所示:

    func doubleNumber(i int, c chan Pair) { c <- Pair{i, i + i} }
    
    func powerNumber(i int, c chan Pair) { c <- Pair{i, i * i} }
    

    我将使用map[int]*Stuff,因为可收集的数据来自多个渠道(doublepower),我想轻松快速地找到合适的Stuff(需要指针,所以我也可以修改它“在地图中”)。

    所以主函数:

    nums := []int{2, 3, 4} // given numbers
    stuffs := map[int]*Stuff{}
    
    double := make(chan Pair)
    power := make(chan Pair)
    
    for _, i := range nums {
        go doubleNumber(i, double)
        go powerNumber(i, power)
    }
    
    // How do I get the values back in the right order?
    for i := 0; i < len(nums)*2; i++ {
        getStuff := func(number int) *Stuff {
            s := stuffs[number]
            if s == nil {
                s = &Stuff{Number: number}
                stuffs[number] = s
            }
            return s
        }
    
        select {
        case p := <-double:
            getStuff(p.Number).Double = p.Result
        case p := <-power:
            getStuff(p.Number).Power = p.Result
        }
    }
    
    for _, v := range nums {
        fmt.Printf("%+v\n", stuffs[v])
    }
    

    输出(在Go Playground 上试试):

    &{Number:2 Double:4 Power:4}
    &{Number:3 Double:6 Power:9}
    &{Number:4 Double:8 Power:16}
    

    使用指针

    由于现在我们正在传递*Stuff 值,我们可以在Stuff 本身中“预填充”输入数字。

    但必须小心,您只能通过适当的同步读取/写入值。最简单的方法是等待所有“worker” goroutine 完成它们的工作。

    var wg = &sync.WaitGroup{}
    
    func main() {
        nums := []int{2, 3, 4} // given numbers
    
        stuffs := make([]Stuff, len(nums))
        for i, n := range nums {
            stuffs[i].Number = n
            wg.Add(2)
            go doubleNumber(&stuffs[i])
            go powerNumber(&stuffs[i])
        }
        wg.Wait()
        fmt.Printf("%+v", stuffs)
    }
    
    func doubleNumber(s *Stuff) {
        defer wg.Done()
        s.Double = s.Number + s.Number
    }
    
    func powerNumber(s *Stuff) {
        defer wg.Done()
        s.Power = s.Number * s.Number
    }
    

    输出(在Go Playground上试试):

    [{Number:2 Double:4 Power:4} {Number:3 Double:6 Power:9} {Number:4 Double:8 Power:16}]
    

    同时写入不同的切片元素

    另外请注意,由于您可以同时写入不同的数组或切片元素(详情请参阅Can I concurrently write different slice elements),您可以将结果直接写入没有通道的切片中。请参阅 Refactor code to use a single channel in an idiomatic way 如何做到这一点。

    【讨论】:

    • 这似乎是一个很好的答案! +1。此解决方案(使用通道的解决方案)是否适用于 N 个子流程?
    • @JorgeBucaran 是的,但使用缓冲通道。对于大量进程/作业,我会使用 producer-consumer 模式,其中工作 goroutine 的数量是固定的并且被很好地隔离。例如,请参阅我在Bruteforce MD5 Password cracker 的回答。
    • 我对这个解决方案的问题是它需要创建一个Pair 结构。我没有足够的经验来正确地说,但我的“直觉”表明@Vatine 的想法是要走的路。
    • @JorgeBucaran 我选择Pair 而不是Stuff 的原因是为了明确接收到的值不是完整的Stuff。接收到的Stuff 只是真实Stuff 的一部分可能会产生误导,因为 Double 和 Power 是由不同的 goroutine 计算并从不同的通道接收的。所以你必须从多个不完整的东西中放回一个“真实”的东西。
    • 如果还有 Triple、Log、Cube 等,每个转换都可以派生自同一个数,在创建任何 goroutine 之前就知道,并且应该独立于其他执行。跨度>
    【解决方案2】:

    就个人而言,我会使用chan Stuff 将结果传回,然后启动goroutines 计算完整的Stuff 并将其传回。如果您需要同时计算单个 Stuff 的各个部分,您可以使用专用通道从每个 goroutine 生成 goroutine。收集完所有结果后,您可以(可选)使用累积值对切片进行排序。

    我在下面的意思示例(原则上,您可以使用sync.WaitGroup 来协调事物,但如果输入计数已知,则严格来说不需要它)。

    type Stuff struct {
      number int64
      double int64
      square int64
    }
    
    // Compute a Stuff with individual computations in-line, send it out
    func computeStuff(n int64, out chan<- Stuff) {
       rv := Stuff{number: n}
       rv.double = n * 2
       rv.square = n * n
       out <- rv
    }
    
    // Compute a Stuff with individual computations concurrent
    func computeStuffConcurrent(n int64, out chan<- Stuff) {
      rv := Stuff{number: n}
      dc := make(chan int64)
      sc := make(chan int64)
      defer close(dc)
      defer close(sc)
      go double(n, dc)
      go square(n, sc)
      rv.double = <-dc
      rv.square = <-sc
      out <- rv
    }
    
    func double(n int64, result chan<- int) {
       result <- n * 2
    }
    
    func square(n int64, result chan<- int) {
      result <- n * n
    }
    
    func main() {
      inputs := []int64{1, 2, 3}
      results := []Stuff{}
      resultChannel := make(chan Stuff)
    
      for _, input := range inputs {
        go computeStuff(input, resultChannel) 
        // Or the concurrent version, if the extra performance is needed
      }
    
      for c := 0; c < len(inputs); c++ {
        results = append(results, <- resultChannel)
      }
      // We now have all results, sort them if you need them sorted
    }
    

    【讨论】:

      【解决方案3】:

      Ordered-concurrently 是一个 go 模块,它同时处理工作并按照提供的顺序返回数据。 https://github.com/tejzpr/ordered-concurrently

      示例 - https://play.golang.org/p/hkcIuRHj63h

      package main
      
      import (
          concurrently "github.com/tejzpr/ordered-concurrently/v2"
          "log"
          "math/rand"
          "time"
      )
      
      type loadWorker int
      
      // The work that needs to be performed
      // The input type should implement the WorkFunction interface
      func (w loadWorker) Run() interface{} {
          time.Sleep(time.Millisecond * time.Duration(rand.Intn(10)))
          return w
      }
      
      func main() {
          max := 10
          inputChan := make(chan concurrently.WorkFunction)
          output := concurrently.Process(inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
          go func() {
              for work := 0; work < max; work++ {
                  inputChan <- loadWorker(work)
              }
              close(inputChan)
          }()
          for out := range output {
              log.Println(out.Value)
          }
      }
      
      

      免责声明:我是模块创建者

      【讨论】:

      • 我无法重现当前示例的死锁,欢迎您继续讨论您在 Github 中提出的问题
      • 我搭上了我的 cmets。感谢修复它。
      猜你喜欢
      • 2018-08-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-12-10
      • 1970-01-01
      • 1970-01-01
      • 2017-10-03
      相关资源
      最近更新 更多