【问题标题】:What is the neatest idiom for producer/consumer in Go?Go 中生产者/消费者最简洁的成语是什么?
【发布时间】:2012-06-20 00:16:15
【问题描述】:

我想做的是拥有一组生产者 goroutine(其中一些可能完成也可能不完成)和一个消费者例程。问题在于括号中的警告 - 我们不知道将返回答案的总数。

所以我想做的是:

package main

import (
  "fmt"
  "math/rand"
)

func producer(c chan int) {
  // May or may not produce.
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
}

func main() {
  c := make(chan int, 10)
  for i := 0; i < 10; i++ {
    go producer(c, signal)
  }

  // If we include a close, then that's WRONG. Chan will be closed
  // but a producer will try to write to it. Runtime error.
  close(c)

  // If we don't close, then that's WRONG. All goroutines will
  // deadlock, since the range keyword will look for a close.
  for num := range c {
    fmt.Printf("Producer produced: %d\n", num)
  }
  fmt.Println("All done.")
}

所以问题是,如果我关闭它是错误的,如果我不关闭它仍然是错误的(参见代码中的 cmets)。

现在,解决方案将是一个带外信号通道,所有生产者都写入:

package main

import (
  "fmt"
  "math/rand"
)

func producer(c chan int, signal chan bool) {
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
  signal <- true
}

func main() {
  c := make(chan int, 10)
  signal := make(chan bool, 10)
  for i := 0; i < 10; i++ {
    go producer(c, signal)
  }

  // This is basically a 'join'.
  num_done := 0
  for num_done < 10 {
    <- signal
    num_done++
  }
  close(c)

  for num := range c {
    fmt.Printf("Producer produced: %d\n", num)
  }
  fmt.Println("All done.")
}

这完全符合我的要求!但对我来说,这似乎是一口。我的问题是:有什么成语/技巧可以让我以更简单的方式做类似的事情吗?

我看过这里:http://golang.org/doc/codewalk/sharemem/ 似乎complete chan(在main 的开头初始化)在一个范围内使用但从未关闭。我不明白怎么做。

如果有人有任何见解,我将不胜感激。干杯!


编辑:fls0815 有答案,并且还回答了无关闭通道范围如何工作的问题。

我上面的代码修改后可以工作(在 fls0815 提供代码之前完成):

package main

import (
  "fmt"
  "math/rand"
  "sync"
)

var wg_prod sync.WaitGroup
var wg_cons sync.WaitGroup

func producer(c chan int) {
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
  wg_prod.Done()
}

func main() {
  c := make(chan int, 10)
  wg_prod.Add(10)
  for i := 0; i < 10; i++ {
    go producer(c)
  }

  wg_cons.Add(1)
  go func() {
    for num := range c {
      fmt.Printf("Producer produced: %d\n", num)
    }
    wg_cons.Done()
  } ()

  wg_prod.Wait()
  close(c)
  wg_cons.Wait()
  fmt.Println("All done.")
}

【问题讨论】:

    标签: concurrency go


    【解决方案1】:

    只有生产者才能关闭频道。您可以通过调用消费者来实现您的目标,消费者在您的生产者启动后在结果通道上迭代 (range)。在您的主线程中,您等待(参见sync.WaitGroup)直到您的消费者/生产者完成他们的工作。生产者完成后,您关闭生成的通道,这将强制您的消费者退出(range 将在通道关闭且没有任何缓冲项目时退出)。

    示例代码:

    package main
    
    import (
        "log"
        "sync"
        "time"
        "math/rand"
        "runtime"
    )
    
    func consumer() {
        defer consumer_wg.Done()
    
        for item := range resultingChannel {
            log.Println("Consumed:", item)
        }
    }
    
    func producer() {
        defer producer_wg.Done()
    
        success := rand.Float32() > 0.5
        if success {
            resultingChannel <- rand.Int()
        }
    }
    
    var resultingChannel = make(chan int)
    var producer_wg sync.WaitGroup
    var consumer_wg sync.WaitGroup
    
    func main() {
        rand.Seed(time.Now().Unix())
    
        for c := 0; c < runtime.NumCPU(); c++ {
            producer_wg.Add(1)  
            go producer()
        }
    
        for c := 0; c < runtime.NumCPU(); c++ {
            consumer_wg.Add(1)
            go consumer()
        }
    
        producer_wg.Wait()
    
        close(resultingChannel)
    
        consumer_wg.Wait()
    }
    

    我之所以把close-statement 放到 main 函数中是因为我们有不止一个生产者。在上面的示例中关闭一个生产者中的通道会导致您已经遇到的问题(在关闭的通道上写入;原因是可能剩下一个生产者仍在生产数据)。只有在没有生产者时才应关闭频道(因此我建议仅由生产者关闭频道)。这就是在 Go 中构建通道的方式。 Here你会发现更多关于关闭频道的信息。


    与 sharemem 示例相关:AFAICS 此示例通过一次又一次地重新排队资源(从挂起 -> 完成 -> 挂起 -> 完成......等等)来无限运行。这就是 main-func 末尾的迭代所做的。它接收完成的资源并使用 Resource.Sleep() 将它们重新排队以待处理。当没有完成的资源时,它会等待并阻塞新资源的完成。因此无需关闭通道,因为它们一直在使用。

    【讨论】:

    • 您好,感谢您的回答 - 这确实是我想要的。您能否扩展您的建议,即“只有制作人应该关闭频道”。 - 这听起来像是常识/代码有意义的规则,但我想知道是否还有技术原因(因为您列出的代码示例具有关闭频道的主要功能)。再次感谢!
    • 嗯,没错,这很有道理。我想也许这可能是一个硬性规则 - 每个生产者都必须检查是否允许关闭频道(因此最后一个完成关闭它)。这显然比在我们的示例中仅在 main() 中关闭它要混乱得多(有更多不必要的检查),但我担心这是做事的方式(出于某种我不知道的原因)。我仍在尝试了解风格的最佳做法。
    • 添加了有关 sharemem 示例的更多信息。
    【解决方案2】:

    总是有很多方法可以解决这些问题。这是一个使用 Go 中基本的简单同步通道的解决方案。没有缓冲通道,没有关闭通道,没有 WaitGroups。

    它离你的“满口”解决方案真的不远,而且——很抱歉让你失望——也没有那么小。它确实将消费者置于自己的 goroutine 中,以便消费者可以在生产者生产数字时消费数字。它还区分了生产“尝试”可以以成功或失败告终。如果生产失败,则立即进行尝试。如果成功,则在消耗完数字之前不会进行尝试。

    package main
    
    import (
        "fmt"
        "math/rand"
    )
    
    func producer(c chan int, fail chan bool) {
        if success := rand.Float32() > 0.5; success {
            c <- rand.Int()
        } else {
            fail <- true
        }
    }
    
    func consumer(c chan int, success chan bool) {
        for {
            num := <-c
            fmt.Printf("Producer produced: %d\n", num)
            success <- true
        }
    }
    
    func main() {
        const nTries = 10
        c := make(chan int)
        done := make(chan bool)
        for i := 0; i < nTries; i++ {
            go producer(c, done)
        }
        go consumer(c, done)
    
        for i := 0; i < nTries; i++ {
            <-done
        }
        fmt.Println("All done.")
    }
    

    【讨论】:

      【解决方案3】:

      我之所以添加这个,是因为现有的答案并不能说明一些事情。首先,codewalk 示例中的范围循环只是一个无限事件循环,它会不断地重新检查和更新同一个 url 列表。

      接下来,通道本身已经 Go 中惯用的消费者-生产者队列。支持通道的异步缓冲区的大小决定了生产者在获得背压之前可以生产多少。在下面设置 N = 0 以查看锁步生产者消费者,而没有任何人领先或落后。事实上,N = 10 将让生产者在阻塞之前生产多达 10 个产品。

      最后,在 Go 中编写通信顺序进程有一些很好的习惯用法(例如,为您启动 go 例程的函数,以及使用 for/select 模式来通信和接受控制命令)。我认为 WaitGroups 很笨拙,并希望看到惯用的示例。

      package main
      
      import (
          "fmt"
          "time"
      )
      
      type control int
      const  (
          sleep control = iota
          die // receiver will close the control chan in response to die, to ack.
      )
      
      func (cmd control) String() string {
          switch cmd {
          case sleep: return "sleep"
          case die: return "die"
          }
          return fmt.Sprintf("%d",cmd)
      }
      
      func ProduceTo(writechan chan<- int, ctrl chan control, done chan bool) {
          var product int
          go func() {
              for {
                  select {
              case writechan <- product:
                  fmt.Printf("Producer produced %v\n", product)
                  product++
              case cmd:= <- ctrl:
                  fmt.Printf("Producer got control cmd: %v\n", cmd)
                  switch cmd {
                  case sleep:
                      fmt.Printf("Producer sleeping 2 sec.\n")
                      time.Sleep(2000 * time.Millisecond)
                  case die:
                      fmt.Printf("Producer dies.\n")
                      close(done)
                      return
                  }
                  }
              }
          }()
      }
      
      func ConsumeFrom(readchan <-chan int, ctrl chan control, done chan bool) {
          go func() {
              var product int
              for {
                  select {
                  case product = <-readchan:
                      fmt.Printf("Consumer consumed %v\n", product)
                  case cmd:= <- ctrl:
                      fmt.Printf("Consumer got control cmd: %v\n", cmd)
                      switch cmd {
                      case sleep:
                          fmt.Printf("Consumer sleeping 2 sec.\n")
                          time.Sleep(2000 * time.Millisecond)
                      case die:
                          fmt.Printf("Consumer dies.\n")
                          close(done)
                          return
                      }
      
                  }
              }
          }()
      }
      
      func main() {
      
          N := 10
          q := make(chan int, N)
      
          prodCtrl := make(chan control)
          consCtrl := make(chan control)
      
          prodDone := make(chan bool)
          consDone := make(chan bool)
      
      
          ProduceTo(q, prodCtrl, prodDone)
          ConsumeFrom(q, consCtrl, consDone)
      
          // wait for a moment, to let them produce and consume
          timer := time.NewTimer(10 * time.Millisecond)
          <-timer.C
      
          // tell producer to pause
          fmt.Printf("telling producer to pause\n")
          prodCtrl <- sleep
      
          // wait for a second
          timer = time.NewTimer(1 * time.Second)
          <-timer.C
      
          // tell consumer to pause
          fmt.Printf("telling consumer to pause\n")
          consCtrl <- sleep
      
      
          // tell them both to finish
          prodCtrl <- die
          consCtrl <- die
      
          // wait for that to actually happen
          <-prodDone
          <-consDone
      }
      

      【讨论】:

        【解决方案4】:

        如果您使用带有 fanIn 函数的生成器模式,您可以使用没有等待组的简单无缓冲通道。

        在生成器模式中,每个生产者返回一个通道并负责关闭它。然后,fanIn 函数遍历这些通道,并将它们返回的值转发到它返回的单个通道。

        问题当然是 fanIn 函数在每个通道关闭时转发通道类型(int)的零值。

        您可以通过使用您的频道类型的零值作为哨兵值来解决此问题,并且如果它们不是零值,则仅使用来自 fanIn 频道的结果。

        这是一个例子:

        package main
        
        import (
            "fmt"
            "math/rand"
        )
        
        const offset = 1
        
        func producer() chan int {
            cout := make(chan int)
            go func() {
                defer close(cout)
                // May or may not produce.
                success := rand.Float32() > 0.5
                if success {
                    cout <- rand.Int() + offset
                }
            }()
            return cout
        }
        
        func fanIn(cin []chan int) chan int {
            cout := make(chan int)
            go func() {
                defer close(cout)
                for _, c := range cin {
                    cout <- <-c
                }
            }()
            return cout
        }
        
        func main() {
            chans := make([]chan int, 0)
            for i := 0; i < 10; i++ {
                chans = append(chans, producer())
            }
        
            for num := range fanIn(chans) {
                if num > offset {
                    fmt.Printf("Producer produced: %d\n", num)
                }
            }
            fmt.Println("All done.")
        }
        

        【讨论】:

          【解决方案5】:

          producer-consumer 是一种很常见的模式,我写了一个库prosumer 方便仔细处理chan 通信。例如:

          func main() {
              maxLoop := 10
              var wg sync.WaitGroup
              wg.Add(maxLoop)
              defer wg.Wait()
          
              consumer := func(ls []interface{}) error {
                  fmt.Printf("get %+v \n", ls)
                  wg.Add(-len(ls))
                  return nil
              }
          
              conf := prosumer.DefaultConfig(prosumer.Consumer(consumer))
              c := prosumer.NewCoordinator(conf)
              c.Start()
              defer c.Close(true)
          
              for i := 0; i < maxLoop; i++ {
                  fmt.Printf("try put %v\n", i)
                  discarded, err := c.Put(i)
                  if err != nil {
                      fmt.Errorf("discarded elements %+v for err %v", discarded, err)
                      wg.Add(-len(discarded))
                  }
                  time.Sleep(time.Second)
              }
          
          }
          

          close有一个叫graceful的参数,表示是否抽掉底层的chan。

          【讨论】:

            猜你喜欢
            • 2014-01-11
            • 1970-01-01
            • 1970-01-01
            • 2023-03-31
            • 1970-01-01
            • 1970-01-01
            • 2022-01-05
            • 2013-03-25
            • 1970-01-01
            相关资源
            最近更新 更多