【问题标题】:Celery periodic_task in Django run once(!)Django 中的 Celery period_task 运行一次(!)
【发布时间】:2013-08-25 13:20:57
【问题描述】:

我正在使用 celery 和 Django 创建一个周期任务,我想每 X 秒运行一次。 该任务应该产生几个子任务,但我需要确保每个主任务只产生一组子任务。

这就是我所拥有的......

@periodic_task(run_every=datetime.timedelta(seconds=2))
def initialize_new_jobs():
    for obj in Queue.objects.filter(status__in=['I', 'Q']):
        obj = Queue.objects.get(id=obj.id)
        if obj.status not in ['I', 'Q']:
            continue
        obj.status = 'A'
        obj.save()
        create_other_task.delay(obj.id)

这有点工作,但感觉不对。我必须在循环开始时刷新 obj 以确保另一个正在运行的periodic_task 不会在同一个 Queue 对象上发出 create_other_task。

有没有更好的方法来做这种工作?基本上,我想尽可能频繁地执行 create_other_task,但每个状态为 I 或 Q 的 Queue 对象只能执行一次。

这是我的问题的简化版本,所以请忽略我可以在创建 Queue 对象时只运行 create_other_task 的事实,而不是运行周期性任务 :)

【问题讨论】:

    标签: python django celery


    【解决方案1】:

    你可以使用交易:

    @periodic_task(run_every=datetime.timedelta(seconds=2))
    @transaction.commit_on_success
    def initialize_new_jobs():
        for obj in Queue.objects.select_for_update().filter(status__in=['I', 'Q']):
            obj.status = 'A'
            obj.save()
            create_other_task.delay(obj.id)
    

    select_for_update() 对行设置排他锁,以便其他用户在尝试读取值时被阻止。在事务提交或回滚后释放锁。 Reference.

    这样您可以确定obj 的状态为IQ,并且obj.save() 将正常工作。

    【讨论】:

      猜你喜欢
      • 2023-03-10
      • 2018-09-10
      • 1970-01-01
      • 2014-07-27
      • 2023-03-24
      • 2020-07-25
      • 2017-09-11
      • 2017-11-25
      • 1970-01-01
      相关资源
      最近更新 更多