【问题标题】:Django celery task duplication: can't lock DB?Django celery 任务重复:无法锁定数据库?
【发布时间】:2016-06-25 13:13:06
【问题描述】:

我的 django 应用程序允许用户互相发送消息,我将一些最近的消息集中在一起,并使用 celery 和 redis 将它们发送到电子邮件中。

每次用户发送消息时,我都会向数据库添加一条消息,然后触发一个异步任务来汇集该用户在过去 60 秒内的消息并将它们作为电子邮件发送。

tasks.pushMessagePool.apply_async(args = (fromUser,), countdown = 60)

如果用户在接下来的 60 秒内发送了 5 条消息,那么我的假设是应该创建 5 个任务,但只有第一个任务发送电子邮件,其他 4 个任务什么都不做。我实现了一个简单的锁定机制,以确保只考虑一次消息并确保数据库锁定。

@shared_task
def pushMessagePool(fromUser, ignore_result=True):
    lockCode = randint(0,10**9)
    data.models.Messages.objects.filter(fromUser = fromUser, locked=False).update(locked=True, lockCode = lockCode)
    M = data.models.Messages.objects.filter(fromUser = fromUser, lockCode = lockCode)
    sendEmail(M,lockCode)

使用此设置,我仍然偶尔会收到 (~10%) 重复。副本将在 10 毫秒内相互触发,并且它们具有不同的 lockCode。

为什么这种锁定机制不起作用?芹菜是指旧的数据库快照吗?那没有任何意义。

【问题讨论】:

    标签: django redis celery django-celery


    【解决方案1】:

    Djangojack,这里有类似的问题吗?但是对于 SQS。我不确定它是否也适用于 Redis?

    创建 SQS 队列时,您需要设置默认可见性 超时时间大于您期望的最大时间 要运行的任务。这是 SQS 将使消息对所有人不可见的时间 交付给一位消费者后的其他消费者。我相信 默认为 30 秒。所以,如果一个任务需要超过 30 秒,SQS 将相同的消息传递给另一个消费者,因为它假设 第一个消费者死亡,没有完成任务。

    来自@gustavo-ambrozio 在this answer 上的评论。

    【讨论】:

    • Redis 的默认可见性超时时间是 1 小时,任务肯定短于 1 小时。但即使它在任务结束时死亡,为什么设置locked=True 不会阻止未来的任务处理相同的记录?
    猜你喜欢
    • 2019-05-25
    • 2011-06-19
    • 1970-01-01
    • 1970-01-01
    • 2011-07-17
    • 1970-01-01
    • 2011-11-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多