【问题标题】:Close multiple goroutine if an error occurs in one in gogo中一个发生错误,关闭多个goroutine
【发布时间】:2018-01-12 00:57:51
【问题描述】:

考虑这个函数:

func doAllWork() error {

    var wg sync.WaitGroup

    for i := 0; i < 2; i++ {

        wg.add(1)
        go func() {

            defer wg.Done()
            for j := 0; j < 10; j++ {
                result, err := work(j)
                if err != nil {
                    // can't use `return err` here
                    // what sould I put instead ? 
                    os.Exit(0)
                }
            }
        }()
    }
    wg.Wait()

    return nil
}

在每个 goroutine 中,函数 work() 被调用 10 次。如果对 work() 的调用在任何正在运行的 goroutine 中返回错误,我希望所有 goroutine 立即停止,并退出程序。 可以在这里使用os.Exit() 吗?我该如何处理?


编辑:这个问题与how to stop a goroutine 不同,因为这里如果发生错误,我需要关闭所有goroutines

【问题讨论】:

  • 如果你只想关闭整个程序也没关系。如果您想将效果隔离为仅关闭生成的 goroutines,您可以使用通道来指示所有 goroutines 停止执行。唯一的变化是 goroutine 将在退出之前完成当前正在执行的 work() 的执行。
  • how to stop a goroutine的可能重复
  • @Flimzy 这不是重复项,因为在标记的重复项中,只有一个 goroutine(控制 goroutine)可以发起取消,但在这个问题中,多个 goroutine(任何工人)可以这样做。那里应用的技术不能直接应用在这里(无需额外工作)。
  • 顺便说一句,我认为应该是wg.Add(2),因为for循环最多会执行两次。

标签: go error-handling synchronization exit goroutine


【解决方案1】:

一个更明确的方法是使用errgroup (documentation)。

errgroup为处理公共任务的子任务的goroutine组提供同步、错误传播和上下文取消。

您可以在此示例中查看 (playground):

    var g errgroup.Group
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
    }

    for _, url := range urls {
        // Launch a goroutine to fetch the URL.
        url := url // https://golang.org/doc/faq#closures_and_goroutines
        
       g.Go(func() error {
            // Fetch the URL.
            resp, err := http.Get(url)
            if err == nil {
                resp.Body.Close()
            }
            return err
        })
    }
   
    // Wait for all HTTP fetches to complete.
    if err := g.Wait(); err == nil {
        fmt.Println("Successfully fetched all URLs.")
    
    } else {

        // After all have run, at least one of them has returned an error!
       // But all have to finish their work!
       // If you want to stop others goroutines when one fail, go ahead reading!
        fmt.Println("Unsuccessfully fetched URLs.")
    }

但是注意:Go documentation 中的The first call to return a non-nil error cancels the group 短语有点误导。

其实errgroup.Groupif创建了一个上下文(WithContext函数),当组内的goroutine返回一个时,会调用WithContext返回的上下文的取消函数错误,否则什么都不做(read the source code here!)。

所以,如果你想关闭不同的 goroutine,你必须使用我的 WithContext 返回的上下文并在它们内部自己管理它,errgroup 将关闭该上下文! Here you can find an example.

总而言之,errgroup 可以以不同的方式使用,如 examples 所示。

  1. “只是错误”,如上例: Wait 等待所有 goroutine 结束,然后返回第一个非 nil 错误(如果有),或者返回 nil

  2. 并行: 您必须使用WithContext function 创建组并使用上下文来管理上下文关闭。 I created a playground example here with some sleeps! 您必须手动关闭每个 goroutine,但使用上下文您可以在关闭上下文时结束它们。

  3. 管道(在examples 中查看更多信息)。

【讨论】:

  • 嗨,我有一个问题要问你,我稍微修改了你的示例play.golang.org/p/ky2DkCZBkyR,如果我理解正确,所有 goroutines (2,3) 应该在 goroutine (4) 返回后立即取消错误。但这并没有发生,goroutines (2,3) 仍然有效,直到它们结束。我错过了什么?
  • 嗨@semafor!你让我想到了这个:)谢谢!我编辑了我的答案,我希望它现在是正确的并且更清楚!如果你现在明白了,请告诉我!
【解决方案2】:

另一种方法是使用errgroup.WithContext。您可以在此example 中查看。

简而言之,g.Wait() 等待第一个错误发生或所有错误都完成。当任何 goroutine 发生错误(在提供的示例中为超时)时,它会通过 ctx.Done() 通道取消其他 goroutine 中的执行。

【讨论】:

  • 然而,使用这个上下文的 goroutine spin 不接受参数:(
【解决方案3】:

您可以使用为此类事情创建的context 包(“带有截止日期,取消信号......”)。

您创建一个能够使用context.WithCancel() 发布取消信号的上下文(父上下文可能是context.Background() 返回的那个)。这将为您返回一个 cancel() 函数,该函数可用于取消(或更准确地说表示取消意图)到工作 goroutine。
在worker goroutines中,你必须检查是否已经启动了这种意图,通过检查Context.Done()返回的通道是否关闭,最简单的方法是尝试从它接收(如果它关闭则立即进行)。并且要进行非阻塞检查(如果它没有关闭,您可以继续),使用带有 default 分支的 select 语句。

我将使用下面的work() 实现,它模拟10% 的失败几率,并模拟1 秒的工作:

func work(i int) (int, error) {
    if rand.Intn(100) < 10 { // 10% of failure
        return 0, errors.New("random error")
    }
    time.Sleep(time.Second)
    return 100 + i, nil
}

doAllWork() 可能看起来像这样:

func doAllWork() error {
    var wg sync.WaitGroup

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // Make sure it's called to release resources even if no errors

    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()

            for j := 0; j < 10; j++ {
                // Check if any error occurred in any other gorouties:
                select {
                case <-ctx.Done():
                    return // Error somewhere, terminate
                default: // Default is must to avoid blocking
                }
                result, err := work(j)
                if err != nil {
                    fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
                    cancel()
                    return
                }
                fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
            }
        }(i)
    }
    wg.Wait()

    return ctx.Err()
}

这是如何测试的:

func main() {
    rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed
    fmt.Printf("doAllWork: %v\n", doAllWork())
}

输出(在Go Playground上试试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled

如果没有错误,例如使用以下work() 函数时:

func work(i int) (int, error) {
    time.Sleep(time.Second)
    return 100 + i, nil
}

输出会是这样的(在Go Playground 上试试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>

注意事项:

基本上我们只使用了上下文的Done() 频道,所以看起来我们可以很容易(如果不是更简单的话)使用done 频道而不是Context,关闭频道来做什么@ 987654347@在上述解决方案中。

这不是真的。 这只能在只有一个 goroutine 可以关闭通道的情况下使用,但在我们的例子中,任何工作人员都可以这样做。 并且尝试关闭已经关闭的通道恐慌(请参阅此处的详细信息:How does a non initialized channel behave? )。因此,您必须确保围绕close(done) 进行某种同步/排除,这将使其可读性降低,甚至更加复杂。实际上,这正是 cancel() 函数在幕后所做的,隐藏/抽象出你的眼睛,所以cancel() 可能会被多次调用以使你的代码/使用它更简单。

如何从工作人员那里获取和返回错误?

为此,您可以使用错误通道:

errs := make(chan error, 2) // Buffer for 2 errors

当遇到错误时,在工作人员内部,将其发送到通道而不是打印它:

result, err := work(j)
if err != nil {
    errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
    cancel()
    return
}

在循环之后,如果有错误,则返回(否则返回nil):

// Return (first) error, if any:
if ctx.Err() != nil {
    return <-errs
}
return nil

这次输出(在Go Playground上试试这个):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error

请注意,我使用了一个缓冲通道,其缓冲区大小等于工作人员的数量,这确保了在其上发送始终是非阻塞的。这也使您有可能接收和处理所有错误,而不仅仅是一个(例如第一个)。另一种选择可能是使用缓冲通道仅保存 1,并对其进行非阻塞发送,如下所示:

errs := make(chan error, 1) // Buffered only for the first error

// ...and inside the worker:

result, err := work(j)
if err != nil {
    // Non-blocking send:
    select {
    case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
    default:
    }
    cancel()
    return
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-12-08
    • 1970-01-01
    • 1970-01-01
    • 2021-08-15
    • 2013-03-20
    • 1970-01-01
    • 2011-05-26
    • 2023-03-15
    相关资源
    最近更新 更多