【问题标题】:Celery SQS + Duplication of tasks + SQS visibility timeoutCelery SQS + 重复任务 + SQS 可见性超时
【发布时间】:2017-05-09 02:04:34
【问题描述】:

我的大多数 Celery 任务的 ETA 都比 Amazon SQS 定义的最大可见性超时时间长。

芹菜documentation 说:

这会导致 ETA/倒计时/重试任务出现问题 执行超过可见性超时;事实上,如果发生这种情况 将再次执行,并在循环中再次执行。

因此您必须增加可见性超时以匹配 您计划使用的最长 ETA。

同时它还说:

在撰写本文时,AWS 支持的最大可见性超时为 12 小时(43200 秒):

如果我使用 SQS,我应该怎么做才能避免在工作人员中多次执行任务?

【问题讨论】:

  • 我假设你使用 acks_late。也许您应该有一些您希望运行时间超过 12 小时的任务才能拥有acks_late=False。你试过吗?您不必全局禁用 acks_late(配置设置),只需将其放在任务注释中即可。

标签: python celery


【解决方案1】:

一般来说,任务的 ETA 很长并不是一个好主意。

首先,存在“visibility_timeout”问题。而且你可能不想要一个非常大的可见性超时,因为如果工作人员在任务即将运行前 1 分钟崩溃,那么队列仍将等待可见性超时完成,然后再将任务发送给另一个工作人员,我猜你不想要这还有 1 个月。

来自 celery 文档:

请注意,Celery 将在工作人员关闭时重新传递消息,因此有 长时间的可见性超时只会延迟“丢失”的重新交付 断电或强制终止时的任务 工人。

而且,SQS 只允许列表中的许多任务被确认。

SQS 将这些任务称为“Inflight Messages”。来自http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html

收到来自 消费者排队,但尚未从队列中删除。

对于标准队列,最多可以有 120,000 个机上 每个队列的消息。如果您达到此限制,Amazon SQS 将返回 超限错误消息。为避免达到极限,您应该 处理完消息后从队列中删除消息。你也可以 增加用于处理消息的队列数量。

对于 FIFO 队列,最多可以有 20,000 条正在进行的消息 每个队列。如果您达到此限制,Amazon SQS 不会返回错误 消息。

我看到了两种可能的解决方案,您可以改用 RabbitMQ,它不依赖于可见性超时(如果您不想自己管理,可以使用“RabbitMQ 即服务”服务),或者将代码更改为具有非常小的 ETA (最佳实践)

这是我的 2 美分,也许@asksol 可以提供一些额外的见解。

【讨论】:

  • 我之前的设置是可见性超时 = 5 分钟。创建任务后,它被添加到执行队列中(假设在 6 小时内使用 ETA)。接下来发生的事情让我很惊讶。在日志中,我看到每五分钟将新任务添加到服务器上的队列中。而且我怀疑所有收集到的任务都会在6小时内一个一个执行。这就是我决定增加可见性超时的原因。是的,如果工作人员崩溃,重新交付的任务将迟到,但工作人员日志中至少会有 1 个任务,而不是 1000 个。如果我的想法不正确,请纠正我。
  • 预计每约 5 分钟会看到一次新任务,因为这是可见性超时。这意味着,如果工作人员在 5 分钟内没有开始执行此任务,则 SQS 认为工作人员错过了该任务,因此 SQS 将重新安排它以将其发送给另一个工作人员。增加可见性超时是解决此问题的一种方法,尽管它也有其自身的权衡。
  • 这里我有两个问题。首先,使用 SQS,除了日志,我没有任何其他方法来控制队列。所以我假设队列日志中列出的所有任务都将被执行。这是真的吗?还是 Celery 会在执行前检查任务的 ID?第二个:现在我在一台服务器上只有一个队列。如果 SQS 代理将相同的任务发送给不同服务器上的工作人员会发生什么?你怎么看,会不会被执行两次?
  • 我没有使用过 SQS,也没有看过 SQS 日志,所以无法回答。但是,如果一个队列将一个任务发送给两个工作人员,那么该任务将被执行两次。
  • 谢谢!我刚刚为队列创建了一个测试,以检查该任务是否会被执行多次,看起来工人收到相同消息的次数并不重要。
【解决方案2】:

Celery 以异步任务调度程序而闻名。与任务的数量无关。如果将任务发送到队列,celery 将执行任务,直到代码出现错误。在将任务发送到队列之前,您必须检查或限制重复任务。

【讨论】:

    【解决方案3】:

    在 SQS 中,您可以更改消息的可见性时间。它记录在here。所以你要做的是,当你处理消息时,你可以定期更新可见性超时,完成后你可以删除消息。

    要定期延长可见性超时,如果您正在使用某个循环,则可以在每次迭代结束时或每 x 次迭代时继续延长超时,具体取决于完成一次迭代的时间。这是执行我的意思的示例代码。

    process_message(){
      for(i=0;i++;..){
        .
        .
        .
        if(i%5 == 0){
         extendVisibilityTimeOut(..)
        }
      }
    }
    

    【讨论】:

    • 问题提到了这一点。问题是可见性超时限制为 12 小时。
    • 这就是为什么我提到“您可以定期更新可见性超时”。所以如果代码有一些for循环,他可以在每次迭代结束时继续延长超时时间。
    猜你喜欢
    • 1970-01-01
    • 2021-06-01
    • 2015-04-27
    • 1970-01-01
    • 1970-01-01
    • 2020-06-10
    • 1970-01-01
    • 2021-12-01
    • 2018-11-29
    相关资源
    最近更新 更多