【问题标题】:Sending .delay() and periodic_task for Celery task functions to different queues将 Celery 任务函数的 .delay() 和periodic_task 发送到不同的队列
【发布时间】:2019-04-01 07:35:27
【问题描述】:

我有一个代码库,其中包含几个应用程序,每个应用程序都有 tasks.py,并且总共有 100 个这样的函数

@periodic_task(run_every=crontab(minute='20'))
def sync_calendar_availability_and_prices(listing_id_list=None, reapply_rules_after_sync=False):

它采用 celery 定期任务定义的旧格式,但在 celery==4.1 上运行良好。

这些通过 beat 每隔几个小时或几分钟执行一次,我也使用 .delay() 在代码库中将它们称为 ad-hoc。我希望所有 .delay() 调用都进入某个 celery 队列manual_call_queue 并定期节拍触发调用相同功能的调用去periodic_beat_fired_queue - 这是一个简单的 1-2 行配置更改在全局级别的某个地方这样做?

我使用 rabbitmq、celery、django 和 django-celery-beat

【问题讨论】:

    标签: django rabbitmq celery django-celery celerybeat


    【解决方案1】:

    要将周期性任务发送到特定队列,请发送队列/选项 arg。

    @periodic_task(run_every=crontab(minute='20'), queue='manual_call_queue', options={'queue': 'periodic_beat_fired_queue'})
    def sync_calendar_availability_and_prices(listing_id_list=None, reapply_rules_after_sync=False):
    

    queue='manual_call_queue' 在使用 .delay 或 .apply_async 调用任务时使用

    options={'queue': 'periodic_beat_fired_queue'} 用于 celery beat 调用任务时使用。

    【讨论】:

    • 谢谢,这太好了 ChillarAnand,有没有办法自动对 celery 在开始时扫描的所有任务执行此操作(来自 django 项目)——我想要相同的完全相同的双重队列所有任务,当@periodic_tasks 的扫描发生时应该有一种设置方法吗?
    • 或者我可以编写一个自定义装饰器 my_periodic_task 并在其中执行此队列定义并在我的自定义装饰器中调用periodic_task?
    • 是的。你可以这样做。 @dowjones123
    • 有没有办法检测被调用的任务函数内部是通过节拍还是非节拍方式调用的?
    猜你喜欢
    • 2013-06-02
    • 2019-01-08
    • 2017-11-18
    • 2019-07-26
    • 1970-01-01
    • 1970-01-01
    • 2022-07-29
    • 2012-04-22
    相关资源
    最近更新 更多