【发布时间】:2017-03-04 11:25:30
【问题描述】:
我正在尝试在 Celery 任务中实现线程池。
我的 Celery 任务调用 update_state() 函数将有关任务状态的信息发送到数据库。它工作成功。 但是,当我将线程添加到任务中并尝试在每个线程中调用 update_state() 函数时,Celery 会返回错误。
这是工作示例(没有线程):
import celery
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
self.update_state(state=states.SUCCESS, meta={'subtask_id': i})
这不是工作示例(使用线程):
import celery
import threading
lock = threading.Lock()
def run_subtask(celery_task, i):
lock.acquire()
#Error raises here, when update_state calls
celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
lock.release()
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
worker = threading.Thread(target=run_subtask, args=(self, i))
worker.start()
错误是:
[2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py",
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key),
[2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found
是什么原因?为什么我不能调用 update_state() 进入线程?
【问题讨论】:
标签: python multithreading celery celery-task