【问题标题】:Decorator that includes celery task with attributes包含具有属性的 celery 任务的装饰器
【发布时间】:2021-06-25 11:14:25
【问题描述】:

我正在开发一个包含多个应用程序的 Django 项目,每个应用程序都有自己的任务集,并且运行良好。但是,一般来说,每个应用程序的任务集使用相同的属性来进行速率限制、重试等。我试图创建一个具有所有这些通用属性的装饰器,并将目标函数设置为任务。

所以,我的example/tasks.py 中有这个:

from celery import current_app


@current_app.task(
    queue='example_queue',
    max_retries=3,
    rate_limit='10/s')
def example_task_1():
    ...

@current_app.task(
    queue='example_queue',
    max_retries=3,
    rate_limit='10/s')
def example_task_2():
    ...

我正在尝试类似的东西:

from celery import current_app, Task


class ExampleTask(Task):

    def __init__(self, task, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.task = task

    def run(self):
        self.task()


def example_decorator_task(func):

    @wraps(func)
    def wrapped(self, *args, **kwargs):
        return ExampleTask(func).delay(
            queue='example_queue',
            max_retries=3,
            rate_limit='10/s')

@example_decorator_task
def example_task_1():
    ...

@example_decorator_task
def example_task_2():
    ...

我得到了这个工作,但调用example_task_1.delay(...),任务将无法像往常一样工作,因为正在包装内执行。 有什么想法吗?

【问题讨论】:

    标签: python django celery


    【解决方案1】:

    当您传递参数时,参数化装饰器会转换为无参数装饰器,但不会将它们应用于函数。因此,您可以通过将结果存储到变量中来创建新的装饰器,然后您可以将其应用于各种任务函数。见:

    from celery import current_app
    
    rate_limited_task = current_app.task(
        queue='example_queue',
        max_retries=3,
        rate_limit='10/s'
    )
    
    @rate_limited_task
    def example_task_1():
        ...
    
    @rate_limited_task
    def example_task_2():
        ...
    

    【讨论】:

      【解决方案2】:

      使用apply_async而不是delay,你的装饰器应该变成这样:

      def decorator(function):
          def wrapper(*args, **kwargs):
              return function.apply_async(args=[*args], kwargs={**kwargs}, **{
                  'queue': 'example_queue',
                  'max_retries': 3,
                  'rate_limit': '10/s'
              })
          return wrapper
      

      更多详情:

      【讨论】:

      • 好的,很酷 :) 但有了这个我仍然需要同时使用:@current_app.task@decorator,是否可以使 decorator 具有与 @current_app.task 相同的功能 +我想要的设置?
      • 对不起,我一开始没有抓住问题,所以我没有确切的答案,但调用 celeryapp.task 装饰器并将函数传递给它,然后调用返回值上的apply_async 可能有效。
      猜你喜欢
      • 2013-12-04
      • 1970-01-01
      • 2012-09-28
      • 2018-08-16
      • 2021-11-05
      • 1970-01-01
      • 2021-02-25
      • 2014-02-15
      • 1970-01-01
      相关资源
      最近更新 更多