【问题标题】:How can I get revoked flag for current task in Celery worker?如何获得 Celery worker 中当前任务的撤销标志?
【发布时间】:2015-05-06 08:50:44
【问题描述】:

我想在 Celery + RabbitMQ 代理上通过清理过程实现任务取消。 如何在 Celery worker 中获取当前任务的“已撤销”状态?

# tasks.py -- celery worker
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
  for i in range(0, 10):
    time.sleep(1)
    # I want check here for cleanup.
  return x + y

# caller.py
from tasks import add
result = add.delay(4, 4)
result.revoke()

Celery 支持Abortable tasks,但它只适用于数据库后端。

Python 3.4.1 / Celery 3.1.17 / RabbitMQ 3.4.4

【问题讨论】:

    标签: python celery


    【解决方案1】:

    看看scheduled_tasks,你可以问celery你的任务是否计划运行。

    前:

        import celery
        celery_inspect = celery.current_app.control.inspect()
        celery_inspect.registered_tasks()
    

    此方法返回一个包含所有 celery 计划任务的字典。

    【讨论】:

    • 那么,我该怎么做才能撤销该任务?
    • 如果你想撤销任务,只需这样做:from celery.task import controlcontrol.revoke({celery_task_id})
    • 谢谢,但我想知道“我(=task)是否被 Celery 框架撤销了?”,而不是“撤销某些特定任务”。
    • 我从未尝试过,但我认为它有效:import celery celery_inspect = celery.current_app.control.inspect()celery_inspect.revoked()。它应该返回一个包含所有已撤销任务的列表。
    【解决方案2】:

    Felippe Da Motta Raposo 的建议适用于我的自定义任务:

    from celery import Task
    from celery.task.control import inspect
    
    WORKER_NAME = "celery@server"
    inspector = inspect([WORKER_NAME])
    
    class CustomTask(Task):
        def _is_revoked(self):
            revoked_list = inspector.revoked()
            return (revoked_list and self.task_id in revoked_list[WORKER_ADDRESS]
    
        def run(self, *args, **kwargs):
            self.task_id = self.request.id
    

    【讨论】:

      猜你喜欢
      • 2018-02-11
      • 1970-01-01
      • 1970-01-01
      • 2011-03-19
      • 2023-03-25
      • 2014-07-10
      • 1970-01-01
      • 2019-12-29
      • 2016-10-21
      相关资源
      最近更新 更多