【问题标题】:Django Celery: Task never executesDjango Celery:任务永远不会执行
【发布时间】:2020-06-28 15:51:50
【问题描述】:

在我的 django 应用程序中,我使用的是 celery。在 post_save 信号中,我正在更新弹性搜索中的索引。但是由于某种原因,任务被挂起并且从未真正执行过代码:

我用什么来运行 celery:

celery -A collegeapp worker -l info

信号:

@receiver(post_save, sender=University)
def university_saved(sender, instance, created, **kwargs):
    """
    University save signal
    """
    print('calling celery task')
    update_university_index.delay(instance.id)
    print('finished')

任务:

@task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(instance_id):
    print('updating university index')

我得到的唯一输出是calling celery task。等待超过 30 分钟后,它永远不会到达任何其他打印语句并且视图继续等待。 celery 终端中没有任何显示。

版本: Django 3.0, 芹菜 4.3, Redis 5.0.9, Ubuntu 18

更新: 经过一些测试后,使用celery.py 文件中定义的debug_task 代替update_university_index 不会导致挂起。它的行为与预期一样。我想也许它可能是 app.tasktask 装饰器,但似乎不是这样。

@app.task(bind=True)
def debug_task(text, second_value):
    print('printing  debug_task {} {}'.format(text, second_value))

【问题讨论】:

  • 你的 celery 版本和操作系统是什么?
  • 4.3 和 ubuntu。我会将它们包括在问题中。
  • 我假设你有工人并行运行?
  • 我不完全确定你的意思。
  • 抱歉,您的应用程序和工作人员同时在单独的终端上运行?

标签: django celery django-celery


【解决方案1】:

这发生在我身上一次,我犯了最愚蠢的错误,django 告诉我们在tasks.py 文件中指定 celery 任务,并将其用于任务发现。之后它起作用了。您能否使用tree 命令更深入地了解目录结构?

这个tutorial是给flask的,但是在django中也可以实现。本教程的亮点在于,在您告诉 celery 执行任务后,它还会为您提供一个uuid,您可以通过ping 来监控您触发的任务的进度。

使用 celery 验证任务是否已注册(请确保 celery 正在运行):

from celery.task.control import inspect
i = inspect()
i.registered_tasks()

或者重击

$ celery inspect registered
$ celery -A collegeapp inspect registered

来自https://docs.celeryproject.org/en/latest/faq.html#the-worker-isn-t-doing-anything-just-hanging

为什么 Task.delay/apply*/worker 只是挂起?

Answer: 一些 AMQP 客户端存在一个错误,如果它无法验证当前用户、密码不匹配或用户无权访问指定的虚拟主机,则会导致其挂起。请务必检查您的代理日志(对于大多数系统上的 /var/log/rabbitmq/rabbit.log 的 RabbitMQ),它通常包含描述原因的消息。

改变这一行

@task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(instance_id):
    print('updating university index')

@task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(self, instance_id):
    print('updating university index')

或者在任务定义中添加self

【讨论】:

  • 我会查看链接。当我最初运行 celery 时,会生成所有任务的列表。任务就在那里,所以我认为它正在被发现。树形结构为:collegeapp/university/tasks.py
  • 您能否在 celery 运行时使用以下命令进行验证,我已经更新了 sn-p 来实现这一点。
  • 顺便说一句,您是否使用 docker 进行编排?还是分别运行 docker 和 celery?
  • 我试过了,但似乎celery.task 没有control。也许您使用的是旧版本,但它已被弃用?
  • 我没有使用 docker。
【解决方案2】:

我仍然不确定为什么它不起作用,但我找到了一个解决方案,将 task 替换为 app.task

从我的celery.py 导入app 似乎已经解决了这个问题。

from collegeapp.celery import app

@app.task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(self, instance_id):
    print('updating university index')

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-01-18
    • 2020-04-01
    • 1970-01-01
    • 2013-06-30
    • 1970-01-01
    • 2019-01-28
    • 2020-12-26
    • 1970-01-01
    相关资源
    最近更新 更多