【问题标题】:Setting Celery Task attributes (i.e time_limit and soft_time_limit) does not work设置 Celery 任务属性(即 time_limit 和 soft_time_limit)不起作用
【发布时间】:2017-02-19 00:32:01
【问题描述】:

根据这个帖子,问题已解决,但似乎不是。 Setting Time Limit on specific task with celery

我当前的 Celery 版本是 3.1.18 (Cipter)。

我正在尝试覆盖任务的默认设置。目标是改变任务的软时间限制和硬时间限制,因为同一个任务有多种用途。

将 soft_time_limit 和 time_limit 传递给 MyTask 构造函数以更改默认设置。

///celery/app/ task.py
class MyTask(task.Task):   
    time_limit = 100
    soft_time_limit = 110
    max_retries = 0

def __init__(self, time_limit=None, soft_time_limit=None,
             max_retries=None, *args, **kwargs):
    if time_limit:
        self.time_limit = time_limit
    if soft_time_limit:
       self.soft_time_limit = soft_time_limit
    if max_retries:
       self.max_retries = max_retries
    task.Task.__init__(self, *args, **kwargs)


t1 = MyTask(time_limit=30, soft_time_limit=20,
        max_retries=5)
or

t1 = MyTask()
t1.time_limit = 30
t1.soft_time_limit = 20

然后将 t1.si() 传递给 task.RetryableChain(...)

job = task.RetryableChain(...)
job.delay()

当工作人员调用 run 方法时,它仍然接收旧值 (time_limit = 100),因为我设置了 time_limit = 30。

如果 3.1.18 版本仍然存在问题,请告诉我。

【问题讨论】:

    标签: celery celery-task


    【解决方案1】:

    我必须修复 celery 代码才能使其正常工作。这绝对是一个临时修复,但它有效。我不确定何时使用新值设置属性,然后为什么不将这些属性转移到 worker.job。我可以感觉到,当我们调用 task.si 或 s() 时,它会创建一个不包含这些 time_limit 属性的 Signature 实例,因此它取自存储在类中的原始值。只是一个想法。

    t1 = MyTask()
    kwargs = {}
    kwargs['time_limit'] = 30
    kwargs['soft_time_limit'] = 40
    
    t.s(kwargs)
    

    ---->>> /celery/worker/job.py

    def execute_using_pool(self, pool, **kwargs):
        """Used by the worker to send this task to the pool.
    
        :param pool: A :class:`celery.concurrency.base.TaskPool` instance.
    
        :raises celery.exceptions.TaskRevokedError: if the task was revoked
            and ignored.
    
        """
        uuid = self.id
        task = self.task
        if self.revoked():
            raise TaskRevokedError(uuid)
    
        hostname = self.hostname
        kwargs = self.kwargs
        if task.accept_magic_kwargs:
            kwargs = self.extend_with_default_kwargs()
        request = self.request_dict
        request.update({'hostname': hostname, 'is_eager': False,
                        'delivery_info': self.delivery_info,
                        'group': self.request_dict.get('taskset')})
        timeout, soft_timeout = request.get('timelimit', (None, None))
        # timeout = timeout or task.time_limit
        # soft_timeout = soft_timeout or task.soft_time_limit
        **# SKAR  request.get(‘time limit’) always returns the original value stored in Task.
        timeout = kwargs.get('time_limit', task.time_limit)
        soft_timeout = kwargs.get('soft_time_limit', task.soft_time_limit)**
        result = pool.apply_async(
            trace_task_ret,
            args=(self.name, uuid, self.args, kwargs, request),
            accept_callback=self.on_accepted,
    

    【讨论】:

      猜你喜欢
      • 2017-06-15
      • 2019-10-17
      • 1970-01-01
      • 2018-09-17
      • 1970-01-01
      • 1970-01-01
      • 2018-12-16
      • 2012-10-16
      • 1970-01-01
      相关资源
      最近更新 更多