【发布时间】:2017-03-25 21:22:26
【问题描述】:
基于这些示例:
https://blog.balthazar-rouberol.com/celery-best-practices
https://shulhi.com/2015/10/13/class-based-celery-task/
Class based Task in django celery
我想创建类似的东西:
class CalculationWorker(Task):
def __init__(self, id_user):
self._id_user = id_user
self._user = get_object_or_404(User, pk=self._id_user)
# I need to understand if the bind work or if it's needed
def _bind(self, app):
return super(self.__class__, self).bind(celery_app)
def _retrieve_some_task(self):
# long calculation
def _long_run_task(self):
# long calculation
self._retrieve_some_task()
# Main entry
def run(self):
self._long_run_task()
#
def run_job():
worker = CalculationWorker(id_user=323232)
task = worker.apply_async()
文档似乎说这是可能的(无论如何我不清楚)http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes。它甚至说:
""" 这意味着 init 构造函数每个进程只会被调用一次,并且任务类在语义上更接近于 Actor。 ""
但http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#the-task-base-class-no-longer-automatically-register-tasks 明确表示:“最佳实践是仅使用自定义任务类来覆盖一般行为,然后使用任务装饰器来实现任务”。
因此,由于https://github.com/celery/celery/issues/3548,我得到了一个 NotRegistered 异常,但添加 app.tasks.register(CalculationWorker()) 并没有解决它。
我正在使用 Django 1.10.X 和 Celery 4.0.0
这种方法仍然有效吗?
谢谢
【问题讨论】: