【问题标题】:How to manage tasks in Celery?如何管理 Celery 中的任务?
【发布时间】:2011-07-16 18:52:50
【问题描述】:

我有一个带有某个参数的任务,我想知道是否有一个带有相同参数的任务。 我有以下内容:

@task
def some_task(id):
    some_task.update_state(state="PROGRESS", meta={"id": id})
    some_action_by_id(id)

但我想要这个:

@task
def some_task(id):
    if !check_task(id):
    some_task.update_state(state="PROGRESS", meta={"id": id})
    some_action_by_id(id)   

我怎样才能做到这一点?

【问题讨论】:

    标签: python celery task


    【解决方案1】:

    您可能的意思是,您希望每时每刻只为每个唯一 ID 运行一个“some_task”。所以你必须实现一个锁定机制。看看here。 Celery 与 Redis 配合得很好!

    【讨论】:

    • 作者的意思是:t = add.delay(2,4) 不能同时执行两次。 t1 = add.delay(2, 4), t.id 应该等于 t1.id
    【解决方案2】:

    这是我的解决方案:

    from celery.task.control import inspect
    from celery.result import AsyncResult
    
    def get_same_task(aTaskName, aArgs, aHosts):
        for jobs in aHosts.values():
            for job in jobs:
                if job['name'] == aTaskName and job['args']  == str(aArgs):
                    return job['id']
        return None
    
    class IgnoreSameArgumentsTask(Task):
        abstract = True
        inspect = inspect()
    
        def delay(self,  *args, **kwargs):
            vHosts_Jobs   = self.inspect.active()
            vTaskId = get_same_task(self.name, args, vHosts_Jobs)
            if vTaskId != None:
                return AsyncResult(vTaskId)
            else:
                return super(IgnoreSameArgumentsTask, self).delay(*args,  **kwargs)
    
    @celery.task(base=IgnoreSameArgumentsTask)
    def add(x, y):
        sleep(x+y)
        return x + y
    

    【讨论】:

      猜你喜欢
      • 2015-05-16
      • 1970-01-01
      • 2018-06-03
      • 2016-10-14
      • 1970-01-01
      • 2015-05-02
      • 1970-01-01
      • 2016-03-22
      • 2020-09-15
      相关资源
      最近更新 更多