【问题标题】:Go queue processing with retry on failure进行队列处理并重试失败
【发布时间】:2017-02-23 05:44:44
【问题描述】:

我们有一堆文件要在处理后上传到远程 blob 存储。

目前,前端 (PHP) 会创建此类文件的 redis 列表并为其提供唯一 ID,称为 JobID。然后,它将唯一 ID 传递给 beanstalk 管,由 Go 进程接收。它使用一个名为 Go workers 的库以 net/http 的方式处理每个作业 ID。它接收作业 ID,检索 redis 列表并开始处理文件。

但是,目前一次只处理一个文件。由于这里的操作是 I/O 限制的,而不是 CPU 限制的,直觉表明每个文件使用一个 goroutine 是有益的。

但是,我们希望在失败时重试上传,并跟踪每个作业处理的项目数。我们不能启动无限数量的 goroutine,因为单个 Job 可以包含大约 10k 个要处理的文件,并且在高峰时间每秒可以发送 100 个这样的 Job。正确的方法是什么?

注意:如果需要,我们可以稍微改变一下技术栈(比如用 beanstalkd 换一些东西)

【问题讨论】:

    标签: asynchronous go redis beanstalkd


    【解决方案1】:

    您可以通过使用缓冲的chan 来限制 goroutine 的数量,其大小为您想要的最大 goroutine 数量。然后,如果此 chan 达到最大容量,您可以阻止它。当你的 goroutine 完成时,它们将释放槽以允许新的 goroutine 运行。

    例子:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    var (
        concurrent    = 5
        semaphoreChan = make(chan struct{}, concurrent)
    )
    
    func doWork(wg *sync.WaitGroup, item int) {
        // block while full
        semaphoreChan <- struct{}{}
    
        go func() {
            defer func() {
                // read to release a slot
                <-semaphoreChan
                wg.Done()
            }()
            // This is where your work actually gets done
            fmt.Println(item)
        }()
    }
    
    func main() {
        // we need this for the example so that we can block until all goroutines finish
        var wg sync.WaitGroup
        wg.Add(10)
    
        // start the work
        for i := 0; i < 10; i++ {
            doWork(&wg, i)
        }
    
        // block until all work is done
        wg.Wait()
    }
    

    去游乐场链接:https://play.golang.org/p/jDMYuCe7HV

    受此 Golang UK 会议演讲的启发:https://youtu.be/yeetIgNeIkc?t=1413

    【讨论】:

    • 它帮助我开始限制并发。但是,现在仍然存在的问题是如何跟踪工作的成功或失败。一个作业包含 N 个子任务,所有子任务都必须处理成功,否则需要报告错误。我该如何解决这个问题?
    • 创建一个传递给 goroutine 的通道。 goroutine 可以将操作的结果(包括错误)写入该通道。调用者可以根据需要从该通道中提取信息来处理错误(例如,记录错误或重试操作)。如果您需要重试操作,请使用具有重试所需上下文(例如,goroutine 需要重试的输入)和错误的自定义结构类型创建通道。
    猜你喜欢
    • 2012-07-11
    • 1970-01-01
    • 1970-01-01
    • 2012-08-03
    • 2018-07-10
    • 2013-02-16
    • 2014-12-11
    • 2019-05-23
    • 2012-10-19
    相关资源
    最近更新 更多