【发布时间】:2017-05-14 04:16:32
【问题描述】:
我想重写 Celery 的 Task 类。我可以重写 on_success 和 on_failure 方法,但是 run 方法对我来说并不容易。我尝试使用 bind 方法。我的代码如下:
class MyTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print("success")
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("failed")
def bind(self, app):
return super(self.__class__, self).bind(app)
def run(self, *args, **kwargs):
x = kwargs.get('data', None)
x = x**2
if __name__=="__main__":
mytask = MyTask()
app = Celery('mytask', backend='redis', broker='redis://localhost')
mytask.bind(app)
job = mytask.apply_async(data = 1)
但是当我运行代码时出现以下错误:
Received unregistered task of type None.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
File "/home/ayandeh/anaconda3/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
strategy = strategies[type_]
KeyError: None
我搜索了很多,但没有得到任何结果。我应该如何注册任务?
【问题讨论】:
标签: python celery celery-task