【问题标题】:Check if all goroutines have finished without using wg.Wait()在不使用 wg.Wait() 的情况下检查所有 goroutine 是否已完成
【发布时间】:2021-08-26 13:22:55
【问题描述】:

假设我有一个函数 IsAPrimaryColour(),它通过调用其他三个函数 IsRed()IsGreen() 来工作>IsBlue()。由于这三个功能彼此完全独立,因此它们可以同时运行。返回条件为:

  1. 如果三个函数中的任何一个返回 true,IsAPrimaryColour() 也应该返回 true。无需等待另一个 完成的功能。即:如果 IsRed()trueIsGreen()IsPrimaryColour()true /em> 是 trueIsBlue()true
  2. 如果所有函数都返回 false,IsAPrimaryColour() 也应该返回 错误的。即:如果 IsRed()false AND IsGreen()IsPrimaryColour()false /em> 是 false 并且 IsBlue()false
  3. 如果三个函数中的任何一个返回错误,IsAPrimaryColour() 也应该返回错误。无需等待另一个 完成或收集任何其他错误的函数。

我正在努力解决的问题是,如果任何其他三个函数返回 true,如何退出该函数,但如果它们都返回 false,则还要等待所有三个函数都完成。如果我使用 sync.WaitGroup 对象,我需要等待所有 3 个 go 例程完成,然后才能从调用函数返回。

因此,我使用循环计数器来跟踪我在频道上收到消息的次数以及在收到所有 3 条消息后存在的程序。

https://play.golang.org/p/kNfqWVq4Wix

package main

import (
    "errors"
    "fmt"
    "time"
)

func main() {
    x := "something"
    result, err := IsAPrimaryColour(x)

    if err != nil {
        fmt.Printf("Error: %v\n", err)
    } else {
        fmt.Printf("Result: %v\n", result)
    }
}

func IsAPrimaryColour(value interface{}) (bool, error) {
    found := make(chan bool, 3)
    errors := make(chan error, 3)
    defer close(found)
    defer close(errors)
    var nsec int64 = time.Now().UnixNano()

    //call the first function, return the result on the 'found' channel and any errors on the 'errors' channel
    go func() {
        result, err := IsRed(value)
        if err != nil {
            errors <- err
        } else {
            found <- result
        }
        fmt.Printf("IsRed done in %f nanoseconds \n", float64(time.Now().UnixNano()-nsec))
    }()

    //call the second function, return the result on the 'found' channel and any errors on the 'errors' channel
    go func() {
        result, err := IsGreen(value)
        if err != nil {
            errors <- err
        } else {
            found <- result
        }
        fmt.Printf("IsGreen done in %f nanoseconds \n", float64(time.Now().UnixNano()-nsec))
    }()

    //call the third function, return the result on the 'found' channel and any errors on the 'errors' channel
    go func() {
        result, err := IsBlue(value)
        if err != nil {
            errors <- err
        } else {
            found <- result
        }
        fmt.Printf("IsBlue done in %f nanoseconds \n", float64(time.Now().UnixNano()-nsec))
    }()

    //loop counter which will be incremented every time we read a value from the 'found' channel
    var counter int

    for {
        select {
        case result := <-found:
            counter++
            fmt.Printf("received a value on the results channel after %f nanoseconds. Value of counter is %d\n", float64(time.Now().UnixNano()-nsec), counter)
            if result {
                fmt.Printf("some goroutine returned true\n")
                return true, nil
            }
        case err := <-errors:
            if err != nil {
                fmt.Printf("some goroutine returned an error\n")
                return false, err
            }
        default:
        }

        //check if we have received all 3 messages on the 'found' channel. If so, all 3 functions must have returned false and we can thus return false also
        if counter == 3 {
            fmt.Printf("all goroutines have finished and none of them returned true\n")
            return false, nil
        }
    }
}

func IsRed(value interface{}) (bool, error) {
    return false, nil
}

func IsGreen(value interface{}) (bool, error) {
    time.Sleep(time.Millisecond * 100) //change this to a value greater than 200 to make this function take longer than IsBlue()
    return true, nil
}

func IsBlue(value interface{}) (bool, error) {
    time.Sleep(time.Millisecond * 200)
    return false, errors.New("something went wrong")
}

虽然这工作得很好,但我想知道我是否没有忽略一些语言功能以更好地做到这一点?

【问题讨论】:

  • 空的default 将使其成为一个消耗整个内核的快速循环。
  • 从包含结果和错误的 goroutine 中返回一个结构。然后你可以简单地从频道中读取三遍。
  • 您的要求我不清楚。 “如果任何其他三个函数返回 true,如何退出该函数,但如果它们都返回 false,则还要等待所有三个函数完成”听起来很矛盾。听起来您想退出 您想同时等待(不退出)?你能阐明你的目标吗?
  • 正确的解决方案是context.Context。我确定我已经看到了相关的答案...让我搜索一下。

标签: go concurrency waitgroup


【解决方案1】:

errgroup.WithContext 可以帮助简化这里的并发。

如果发生错误或找到结果,您希望停止所有 goroutine。如果您可以将“找到结果”表示为一个明显的错误(沿着io.EOF 的行),那么您可以使用errgroup 的内置“第一个错误取消”行为来关闭整个组:

func IsAPrimaryColour(ctx context.Context, value interface{}) (bool, error) {
    var nsec int64 = time.Now().UnixNano()

    errFound := errors.New("result found")
    g, ctx := errgroup.WithContext(ctx)

    g.Go(func() error {
        result, err := IsRed(ctx, value)
        if result {
            err = errFound
        }
        fmt.Printf("IsRed done in %f nanoseconds \n", float64(time.Now().UnixNano()-nsec))
        return err
    })

    …

    err := g.Wait()
    if err == errFound {
        fmt.Printf("some goroutine returned errFound\n")
        return true, nil
    }
    if err != nil {
        fmt.Printf("some goroutine returned an error\n")
        return false, err
    }
    fmt.Printf("all goroutines have finished and none of them returned true\n")
    return false, nil
}

(https://play.golang.org/p/MVeeBpDv4Mn)

【讨论】:

  • 感谢您的回答。上下文确实似乎是编写此类函数的“正确” Go 方式。它在我的特定情况下不起作用,因为我正在使用的函数来自外部库,我无法编辑供应商的代码来处理取消。但是无论如何学习这个非常有趣!
  • 如果您无法编辑供应商的代码来处理取消,那么提前返回是不安全的:如果任何操作停止,您的程序将有无限的内存消耗。但是您仍然可以使用errgroup.Group 来简化聚合步骤:play.golang.org/p/Xx_tx_bHhaO
【解决方案2】:

一些评论,

  • 您不需要关闭通道,您事先知道要读取的信号的预期计数。这足以满足退出条件。
  • 您不需要重复手动函数调用,使用切片。
  • 既然你使用了切片,你甚至不需要计数器,或者静态值 3,只要看看你的 func 切片的长度。
  • 切换到默认大小写是没有用的。只需阻止您正在等待的输入。

所以一旦你摆脱了所有的脂肪,代码看起来像


func IsAPrimaryColour(value interface{}) (bool, error) {
    fns := []func(interface{}) (bool, error){IsRed, IsGreen, IsBlue}
    found := make(chan bool, len(fns))
    errors := make(chan error, len(fns))

    for i := 0; i < len(fns); i++ {
        fn := fns[i]
        go func() {
            result, err := fn(value)
            if err != nil {
                errors <- err
                return
            }
            found <- result
        }()
    }

    for i := 0; i < len(fns); i++ {
        select {
        case result := <-found:
            if result {
                return true, nil
            }
        case err := <-errors:
            if err != nil {
                return false, err
            }
        }
    }
    return false, nil
}
  • 您不需要观察每个异步调用的时间,只需观察整个调用者返回所用的时间。
func main() {
    now := time.Now()
    x := "something"
    result, err := IsAPrimaryColour(x)

    if err != nil {
        fmt.Printf("Error: %v\n", err)
    } else {
        fmt.Printf("Result: %v\n", result)
    }
    fmt.Println("it took", time.Since(now))
}

https://play.golang.org/p/bARHS6c6m1c

【讨论】:

  • 在该示例中,如果任何结果为真,则后续 goroutine 可能会在任意长时间内泄漏(未被检测到)。 (回想一下,上下文取消是异步的。)对于非平凡的程序,这可能会导致负载下的内存不足故障、测试中的数据竞争、倾斜的基准测试等。
  • @bcmills 你能举例说明一下吗?我看不出它是如何泄漏的; play.golang.org/p/zWB9s6zj76l
  • 如果你说的是被调用函数异常缓慢的情况。嗯,是的,这可能会发生。然后,我觉得有必要注意传递上下文只是解决方案的一部分。有问题的函数很可能缺少上下文取消断言来提前返回。沿着这条路走下去,任何事情都可能发生,而且语言并没有为我们提供确保正确行为的工具。我在争论是因为我想了解你的观点......
  • “异常缓慢”的操作可能而且确实会发生在实际程序中,尤其是在异常负载下(例如在部分网络中断期间,或者如果服务因某种原因病毒传播)。如果发生这种情况,您希望程序优雅地降级,而不是内存不足和崩溃。优雅降级的最简单方法是将调用构造为同步的,而不是放弃运行中的异步操作。
  • 是的..?但是,如果我们一开始就教人们更健壮的模式,这并不是“重新学习”。 (为此,我在rethinking concurrency patterns in Go 上做了一个完整的演讲。)
【解决方案3】:

处理多个并发函数调用并在条件后取消任何未完成的方法的惯用方法是使用上下文值。像这样的:

func operation1(ctx context.Context) bool { ... }
func operation2(ctx context.Context) bool { ... }
func operation3(ctx context.Context) bool { ... }

func atLeastOneSuccess() bool {
    ctx, cancel := context.WithCancel(context.Background()
    defer cancel() // Ensure any functions still running get the signal to stop
    results := make(chan bool, 3) // A channel to send results
    go func() {
        results <- operation1(ctx)
    }()
    go func() {
        results <- operation2(ctx)
    }()
    go func() {
        results <- operation3(ctx)
    }()
    for i := 0; i < 3; i++ {
        result := <-results
        if result {
            // One of the operations returned success, so we'll return that
            // and let the deferred call to cancel() tell any outstanding
            // functions to abort.
            return true
        }
    }
    // We've looped through all return values, and they were all false
    return false
}

当然,这假设每个operationN 函数实际上都支持取消的上下文。 This answer 讨论如何做到这一点。

【讨论】:

  • 在该示例中,如果任何结果为 true,则后续的 goroutine 可能会在任意长时间内泄漏(未被检测到)。 (回想一下,Context 取消是异步的。)对于非平凡的程序,这可能会导致负载下的内存不足故障、测试中的数据竞争、倾斜的基准测试等。
  • 感谢您的回答。上下文确实似乎是编写此类函数的“正确” Go 方式。它在我的特定情况下不起作用,因为我正在使用的函数来自外部库,我无法编辑供应商的代码来处理取消。但是无论如何学习这个非常有趣!
  • 如果你不能中止操作,那么你只有两个选择:等到所有功能都返回,或者忽略一些还在运行的事实,让它们在后台继续运行,可能会消耗比您预期更多的资源。
【解决方案4】:

你不必阻塞Wait上的主goroutine,你可以阻塞别的东西,例如:

doneCh := make(chan struct{}{})

go func() {
    wg.Wait()
    close(doneCh)
}()

然后您可以在select 中等待doneCh,看看是否所有例程都已完成。

【讨论】:

  • 感谢您的回答。另一个问题虽然只是为了我的澄清,但我认为 wg.Wait() 只会在所有 go 例程完成执行时返回(或者更确切地说,当所有例程都减少了等待组计数器时)。使用我之前的示例,如果 IsRed() 返回 true,wg.Wait() 不会仍然等待 IsBlue() 和 IsGreen() 完成吗?
  • 是的。这就是为什么您要在 select 中使用它的原因 - 这样您就可以同时检查所有例程是否已完成,或者其中一个是否返回 true,或者其中一个是否返回错误。
  • 如果select 的其他分支之一返回,这种方法仍然会泄漏正在运行的goroutine——它简化了循环,但如果任何其他操作始终如一,仍然存在内存不足的风险摊位。
  • 谢谢@Adrian。我选择这个作为答案,因为这种方法不需要我修改函数。这对我特别有用,因为我正在使用的函数来自外部库,我无法编辑它们以添加上下文。
  • “除非它们包含缺陷”部分是一个繁重的任务:真正的程序通常确实包含缺陷。如果您一直等待 goroutine 完成,那么您可以在测试期间相当容易地检测到这些缺陷——当测试超时时,它们就在 goroutine 转储中。如果你不这样做,那么当你的程序在生产环境中出现 OOM 时,你就会试图筛选一个更大的核心转储。
猜你喜欢
  • 2010-12-16
  • 2011-10-18
  • 1970-01-01
  • 1970-01-01
  • 2016-08-24
  • 2014-07-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多