【问题标题】:How to bind celery application with Task class?如何将芹菜应用程序与任务类绑定?
【发布时间】:2017-05-14 04:16:32
【问题描述】:

我想重写 Celery 的 Task 类。我可以重写 on_success 和 on_failure 方法,但是 run 方法对我来说并不容易。我尝试使用 bind 方法。我的代码如下:

class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print("success")

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("failed")

    def bind(self, app):
        return super(self.__class__, self).bind(app)

    def run(self, *args, **kwargs):
        x = kwargs.get('data', None)
        x = x**2


if __name__=="__main__":
     mytask = MyTask()
     app = Celery('mytask', backend='redis', broker='redis://localhost')
     mytask.bind(app)
     job = mytask.apply_async(data = 1)

但是当我运行代码时出现以下错误:

Received unregistered task of type None.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
  File "/home/ayandeh/anaconda3/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
    strategy = strategies[type_]
KeyError: None

我搜索了很多,但没有得到任何结果。我应该如何注册任务?

【问题讨论】:

    标签: python celery celery-task


    【解决方案1】:

    不需要显式绑定应用,因为 celery 任务在调用 apply_async 时会自动绑定到 current_app。 如果您想显式绑定,有两种方法: 1. task.bind(app) 2.从app.Task创建Task类。通过继承app.Task,将绑定此应用。

    您的问题与绑定无关。这是关于任务注册表。更正如下:

    from celery import Celery, Task
    
    class MyTask(Task):
        name = 'mytask'
    
        def on_success(self, retval, task_id, args, kwargs):
            print("success")
    
        def on_failure(self, exc, task_id, args, kwargs, einfo):
            print("failed")
    
        def run(self, *args, **kwargs):
            x = kwargs.get('data', None)
            print(x**2)
    
    
    if __name__ == "__main__":
        app = Celery('mytask', backend='redis', broker='redis://localhost')
        MyTask.bind(app)
        app.tasks.register(MyTask)
        app.worker_main()
    

    希望对您有所帮助。

    【讨论】:

      猜你喜欢
      • 2023-03-29
      • 2019-09-13
      • 1970-01-01
      • 2013-02-19
      • 2020-01-21
      • 1970-01-01
      • 1970-01-01
      • 2021-12-16
      • 2013-11-09
      相关资源
      最近更新 更多