【问题标题】:How to get list of scheduled tasks and their arguments with Celery?如何使用 Celery 获取计划任务列表及其参数?
【发布时间】:2021-08-22 13:30:02
【问题描述】:

我想在创建新任务之前检查之前的计划任务以防止重复。如何使用 python 和 celery 获取所有计划任务及其参数的列表?

【问题讨论】:

    标签: python celery


    【解决方案1】:

    my_proj/celery.py

    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery("my_proj")
    
    app.conf.update(
        imports=["task"],
        timezone="UTC",
        beat_schedule={
            "task.common.add": {
                "task": "task.common.add",
                "schedule": crontab(),
                'args': (1, 2),
            },
            "task.common.mul": {
                "task": "task.common.mul",
                "schedule": crontab(minute=0, hour=0),
                'args': (3,),
                'kwargs': {'y': 4},
            },
        },
    )
    

    task/common.py

    from my_proj.celery import app  # Use @shared_task if in Django
    
    @app.task
    def add(x, y):
        print(f"{x}, {y}, {x + y}")
        return x + y
    
    
    @app.task
    def mul(x, y):
        print(f"{x}, {y}, {x * y}")
        return x * y
    

    选项 1:

    >>> from my_proj.celery import app
    >>> app.conf.beat_schedule
    {'task.common.add': {'task': 'task.common.add', 'schedule': <crontab: * * * * * (m/h/d/dM/MY)>, 'args': (1, 2)}, 'task.common.mul': {'task': 'task.common.mul', 'schedule': <crontab: 0 0 * * * (m/h/d/dM/MY)>, 'args': (3,), 'kwargs': {'y': 4}}}
    

    选项 2:

    如果使用文件调度程序,例如celery.beat.PersistentScheduler(默认)其中writes to a local shelve database file 名称为celerybeat-schedule

    • 警告:如果调度程序当前正在运行,这可能不起作用,因为文件将被锁定且无法读取。
    >>> import shelve
    >>> with shelve.open('celerybeat-schedule') as schedule:
    ...     print(schedule['entries'])
    ... 
    {'task.common.mul': <ScheduleEntry: task.common.mul task.common.mul(3, y=4) <crontab: 0 0 * * * (m/h/d/dM/MY)>, 'task.common.add': <ScheduleEntry: task.common.add task.common.add(1, 2) <crontab: * * * * * (m/h/d/dM/MY)>, 'celery.backend_cleanup': <ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>}
    

    选项 3:

    如果使用数据库调度程序,例如django-celery-beatcelery-sqlalchemy-scheduler,查询数据库记录即可。所以如果使用 django-celery-beat,那就是:

    >>> from django_celery_beat.models import PeriodicTask, PeriodicTasks
    >>> PeriodicTask.objects.all()
    <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) UTC>, <PeriodicTask: task.common.add: * * * * * (m/h/dM/MY/d) UTC>, <PeriodicTask: task.common.mul: 0 0 * * * (m/h/dM/MY/d) UTC>]>
    >>> PeriodicTask.objects.values('name', 'task', 'args', 'kwargs')
    <ExtendedQuerySet [{'name': 'celery.backend_cleanup', 'task': 'celery.backend_cleanup', 'args': '[]', 'kwargs': '{}'}, {'name': 'task.common.add', 'task': 'task.common.add', 'args': '[1, 2]', 'kwargs': '{}'}, {'name': 'task.common.mul', 'task': 'task.common.mul', 'args': '[3]', 'kwargs': '{"y": 4}'}]>
    

    【讨论】:

      猜你喜欢
      • 2012-04-19
      • 2011-07-18
      • 2013-09-09
      • 2017-09-22
      • 2017-09-08
      • 2021-04-19
      • 2011-07-26
      • 2014-03-17
      • 1970-01-01
      相关资源
      最近更新 更多