您可以使用为此类事情创建的context 包(“带有截止日期,取消信号......”)。
您创建一个能够使用context.WithCancel() 发布取消信号的上下文(父上下文可能是context.Background() 返回的那个)。这将为您返回一个 cancel() 函数,该函数可用于取消(或更准确地说表示取消意图)到工作 goroutine。
在worker goroutines中,你必须检查是否已经启动了这种意图,通过检查Context.Done()返回的通道是否关闭,最简单的方法是尝试从它接收(如果它关闭则立即进行)。并且要进行非阻塞检查(如果它没有关闭,您可以继续),使用带有 default 分支的 select 语句。
我将使用下面的work() 实现,它模拟10% 的失败几率,并模拟1 秒的工作:
func work(i int) (int, error) {
if rand.Intn(100) < 10 { // 10% of failure
return 0, errors.New("random error")
}
time.Sleep(time.Second)
return 100 + i, nil
}
doAllWork() 可能看起来像这样:
func doAllWork() error {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
// Check if any error occurred in any other gorouties:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
result, err := work(j)
if err != nil {
fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
}
}(i)
}
wg.Wait()
return ctx.Err()
}
这是如何测试的:
func main() {
rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed
fmt.Printf("doAllWork: %v\n", doAllWork())
}
输出(在Go Playground上试试):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled
如果没有错误,例如使用以下work() 函数时:
func work(i int) (int, error) {
time.Sleep(time.Second)
return 100 + i, nil
}
输出会是这样的(在Go Playground 上试试):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>
注意事项:
基本上我们只使用了上下文的Done() 频道,所以看起来我们可以很容易(如果不是更简单的话)使用done 频道而不是Context,关闭频道来做什么@ 987654347@在上述解决方案中。
这不是真的。 这只能在只有一个 goroutine 可以关闭通道的情况下使用,但在我们的例子中,任何工作人员都可以这样做。 并且尝试关闭已经关闭的通道恐慌(请参阅此处的详细信息:How does a non initialized channel behave? )。因此,您必须确保围绕close(done) 进行某种同步/排除,这将使其可读性降低,甚至更加复杂。实际上,这正是 cancel() 函数在幕后所做的,隐藏/抽象出你的眼睛,所以cancel() 可能会被多次调用以使你的代码/使用它更简单。
如何从工作人员那里获取和返回错误?
为此,您可以使用错误通道:
errs := make(chan error, 2) // Buffer for 2 errors
当遇到错误时,在工作人员内部,将其发送到通道而不是打印它:
result, err := work(j)
if err != nil {
errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
在循环之后,如果有错误,则返回(否则返回nil):
// Return (first) error, if any:
if ctx.Err() != nil {
return <-errs
}
return nil
这次输出(在Go Playground上试试这个):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error
请注意,我使用了一个缓冲通道,其缓冲区大小等于工作人员的数量,这确保了在其上发送始终是非阻塞的。这也使您有可能接收和处理所有错误,而不仅仅是一个(例如第一个)。另一种选择可能是使用缓冲通道仅保存 1,并对其进行非阻塞发送,如下所示:
errs := make(chan error, 1) // Buffered only for the first error
// ...and inside the worker:
result, err := work(j)
if err != nil {
// Non-blocking send:
select {
case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
default:
}
cancel()
return
}