【问题标题】:Python multithreading into Celery task. celery_task.update_state() errorPython 多线程进入 Celery 任务。 celery_task.update_state() 错误
【发布时间】:2017-03-04 11:25:30
【问题描述】:

我正在尝试在 Celery 任务中实现线程池。

我的 Celery 任务调用 update_state() 函数将有关任务状态的信息发送到数据库。它工作成功。 但是,当我将线程添加到任务中并尝试在每个线程中调用 update_state() 函数时,Celery 会返回错误。

这是工作示例(没有线程):

import celery

@celery.task(bind=True)
def get_info(self, user):
    for i in xrange(4):
        self.update_state(state=states.SUCCESS, meta={'subtask_id': i})

这不是工作示例(使用线程):

import celery
import threading

lock = threading.Lock()

def run_subtask(celery_task, i):
    lock.acquire()
    #Error raises here, when update_state calls
    celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
    lock.release()

@celery.task(bind=True)
def get_info(self, user):

    for i in xrange(4):
        worker = threading.Thread(target=run_subtask, args=(self, i))
        worker.start()

错误是:

    [2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py", 
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key), 
    [2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found

是什么原因?为什么我不能调用 update_state() 进入线程?

【问题讨论】:

    标签: python multithreading celery celery-task


    【解决方案1】:

    Celery 向线程添加了一种上下文对象,因此它知道它与哪个任务相关。为了将线程与任务相关联,您需要执行以下操作:

    from celery.app import push_current_task
    
    
    def run_subtask(celery_task, i):
        push_current_task(celery_task)
    
        ...
    
        pop_current_task()
    

    【讨论】:

    • 非常感谢!我已经找到了解决方案并将其发布在这里。我没有测试您的解决方案,但我认为它具有相同的方向。
    • 我的是文档推荐的方式。我建议你使用我的。
    【解决方案2】:

    我找到了答案!这是 Celery 贡献者之一的回答:

    task.request 是一个本地线程,所以只有执行任务的线程才能调用 update_state。

    如果您认为线程可能与存储结果的任务发布处理程序竞争,这尤其有意义。

    您可以将task_id传递给线程:

    cp_self.update_state(task_id=task_id, state='PROGRESS', meta={'timeout': to})
    

    但是您必须确保线程在任务退出之前已加入并停止(thread.join())。 在您的示例中,线程只能在 while 循环退出后加入,并且由于您正在休眠 1 秒,因此可以延迟加入。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-09-18
      • 2018-08-03
      • 1970-01-01
      • 2016-03-31
      • 2013-12-12
      • 2012-02-20
      相关资源
      最近更新 更多