【问题标题】:In golang, how to write a pipeline stage that introduces a delay for the next stage?在golang中,如何编写一个为下一个阶段引入延迟的管道阶段?
【发布时间】:2018-04-01 23:12:32
【问题描述】:

我正在关注https://blog.golang.org/pipelines 文章来实现几个阶段。

我需要一个阶段在事件传递到管道的下一个阶段之前引入几秒钟的延迟。

我对下面代码的担忧是,在传递事件之前,time.Sleep() 会产生无限数量的 go 例程。有没有更好的方法来做到这一点?

谢谢!

func fooStage(inChan <- chan *Bar) (<- chan *Bar) {
    out := make(chan *Bar, 10000)
    go func() {
        defer close(out)
        wg := sync.WaitGroup{}
        for {
            select {
            case event, ok := <-inChan:
                if !ok {
                    // inChan closed
                    break
                }
                wg.Add(1)
                go func() {
                    time.Sleep(5 * time.Second)
                    out <- event
                    wg.Done()
                }()
            }
        }
        wg.Wait()
    }()
    return out
}

【问题讨论】:

    标签: asynchronous go concurrency channel goroutine


    【解决方案1】:

    我已经用我的pipeline library 解决了这样的问题,就像这样:

        import "github.com/nazar256/parapipe"
        //...
        pipeline := parapipe.NewPipeline(10).
        Pipe(func(msg interface{}) interface{} {
            //some code
        }).
        Pipe(func(msg interface{}) interface{} {
            time.Sleep(3*time.Second)
            return msg
        }).
        Pipe(func(msg interface{}) interface{} {
            //some other code
        })
    

    【讨论】:

      【解决方案2】:

      这是您应该用于管道应用程序的内容。上下文允许更快的拆卸。

      负责管理您的in 频道必须在拆除期间关闭它。 始终关闭您的频道。

      // Delay delays each `interface{}` coming in through `in` by `duration`.
      // If the context is canceled, `in` will be flushed until it closes.
      // Delay is very useful for throtteling back CPU usage in your pipelines.
      func Delay(ctx context.Context, duration time.Duration, in <-chan interface{}) <-chan interface{} {
          out := make(chan interface{})
          go func() {
              // Correct memory management
              defer close(out)
      
              // Keep reading from in until its closed
              for i := range in {
                  // Take one element from in and pass it to out
                  out <- i
      
                  select {
                  // Wait duration before reading from in again
                  case <-time.After(duration):
      
                  // Don't wait if the context is canceled
                  case <-ctx.Done():
                  }
              }
          }()
          return out
      }
      

      【讨论】:

        【解决方案3】:

        您可以手动修复 goroutine 的数量 - 仅从您需要的数量开始。

        func sleepStage(in <-chan *Bar) (out <-chan *Bar) {
             out = make(<-chan *Bar)
             wg := sync.WaitGroup
             for i:=0; i < N; i++ {  // Number of goroutines in parallel
                   wg.Add(1)
                   go func(){
                        defer wg.Done()
                        for e := range in {
                            time.Sleep(5*time.Seconds)
                            out <- e
                        }
                    }()
              }
              go func(){}
                   wg.Wait()
                   close(out)
               }()
               return out
          }
        

        【讨论】:

          【解决方案4】:

          你可以使用time.Ticker:

          func fooStage(inChan <- chan *Bar) (<- chan *Bar) {
              //... some code
              ticker := time.NewTicker(5 * time.Second)
              <-ticker // the delay, probably need to call twice
              ticker.Stop()
              close(ticker.C)
              //... rest code
          }
          

          【讨论】:

          • 你能解释一下它是如何与后续事件一起工作的吗?如果两个事件通过 inChan 进来,第二个事件不会等待 10 秒吗?
          • &lt;-ticker 的返回频率不应超过指定的时间段。试试吧
          【解决方案5】:

          您可以使用另一个通道来限制您的循环能够创建的活动 goroutine 的数量。

          const numRoutines = 10
          
          func fooStage(inChan <-chan *Bar) <-chan *Bar {
              out := make(chan *Bar, 10000)
              routines := make(chan struct{}, numRoutines)
              go func() {
                  defer close(out)
                  wg := sync.WaitGroup{}
                  for {
                      select {
                      case event, ok := <-inChan:
                          if !ok {
                              // inChan closed
                              break
                          }
                          wg.Add(1)
                          routines <- struct{}{}
                          go func() {
                              time.Sleep(5 * time.Second)
                              out <- event
                              wg.Done()
                              <-routines
                          }()
                      }
                  }
                  wg.Wait()
              }()
              return out
          }
          

          【讨论】:

          • 谢谢,这似乎是个好主意。我能看到的唯一缺点是如果routines频道被阻塞,事件会延迟5秒以上。我想在事件中没有时间戳的情况下没有解决这个问题的好方法。
          • @ultimoo 由于 5 秒的等待,您可以轻松运行数百或数千个 goroutine,这将减少实际的事件等待时间。但是,仅通过阅读代码很难确定这样的事情。需要进行测试和基准测试才能真正确定事情的实际运行方式。
          • 当然,这更像是一个实验。我的直觉是,运行几千个这样的 goroutine 应该没问题——因为它们所做的只是运行 time.Sleep(),所以它们的大部分生命周期都不会被安排在处理器上。
          • 您现在在代码中的唯一保证是事件将至少延迟 5 秒。如果您需要它们在特定时间范围内执行,您需要一种不同的方法 - 您还需要在生产系统上随时间对它进行基准测试,使其处于与您的应用程序一起运行的状态(或至少接近它的状态)。
          猜你喜欢
          • 2019-03-03
          • 2020-05-29
          • 2022-11-11
          • 1970-01-01
          • 2020-01-07
          • 1970-01-01
          • 2020-05-08
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多