【问题标题】:Can't import models in tasks.py with Celery + Django无法使用 Celery + Django 在 tasks.py 中导入模型
【发布时间】:2018-12-27 20:12:52
【问题描述】:

我想创建一个后台任务来更新特定日期的记录。我将 Django 和 Celery 与 RabbitMQ 一起使用。

我已经设法在使用这个虚拟任务函数保存模型时调用任务:

tasks.py

from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

app = Celery('tasks', broker='amqp://localhost//')


@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
    # (I pass the news id and return it, nothing complicated about it)
    return news_id

这个任务是从我的 models.py

中的 save() 方法调用的
from django.db import models
from celery import current_app


class News(models.model):
    (...)

    def save(self, *args, **kwargs):

        current_app.send_task('news.tasks.update_news_status', args=(self.id,))

        super(News, self).save(*args, **kwargs)

事情是我想在tasks.py中导入我的新闻模型,但如果我尝试这样:

from .models import News

我收到此错误:

django.core.exceptions.ImproperlyConfigured:请求的设置 DEFAULT_INDEX_TABLESPACE,但未配置设置。你必须 要么定义环境变量 DJANGO_SETTINGS_MODULE 要么调用 settings.configure() 在访问设置之前。

这就是 mi celery.py 的样子

from __future__ import absolute_import, unicode_literals
from celery import Celery
import os
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

我已经试过了:

  1. can't import django model into celery task
  2. 我已尝试在任务方法Django and Celery, AppRegisteredNotReady exception 中进行导入
  3. 我也试过这个Celery - importing models in tasks.py
  4. 我也尝试创建一个 utils.py 并将其导入,但不可行。

遇到了不同的错误,但最后我无法在 tasks.py 中导入任何模块

我的配置可能有问题但我看不到错误,我按照The Celery Docs: First steps with Django中的步骤操作

另外,我的项目结构是这样的:

├── myapp
│   ├── __init__.py
├── ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── news
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── tasks.py
│   ├── urls.py
│   ├── models.py
│   ├── views.py
├── manage.py

我正在像这样从myapp 目录执行工作人员:

celery -A news.tasks worker --loglevel=info

我在这里缺少什么?提前感谢您的帮助!

lambda:settings.INSTALLED_APPS

编辑

在 cmets 中进行建议的更改后: 将此添加到 celery.py app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

并导入内部方法:tasks.py

from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

app = Celery('tasks', broker='amqp://localhost//')


@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
    from .models import News
    return news_id

我收到以下错误:

[2018-07-20 12:24:29,337: ERROR/ForkPoolWorker-1] Task news.tasks.update_news_status[87f9ec92-c260-4ee9-a3bc-5f684c819f79] raised unexpected: ValueError('Attempted relative import in non-package',)
Traceback (most recent call last):
  File "/Users/carla/Develop/App/backend/myapp-venv/lib/python2.7/site-packages/celery/app/trace.py", line 382, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/Users/carla/Develop/App/backend/myapp-venv/lib/python2.7/site-packages/celery/app/trace.py", line 641, in __protected_call__
    return self.run(*args, **kwargs)
  File "/Users/carla/Develop/App/backend/news/tasks.py", line 12, in update_news_status
    from .models import News
ValueError: Attempted relative import in non-package

【问题讨论】:

  • 你能用这个app吗.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 什么是芹菜版本>>>???
  • 嗨@HemanthSP 添加这不起作用。 Celery 版本是 4.2.1
  • 你可以试试 celery -A news worker -l info,你的代码在 redis 上运行良好。并尝试将 task.py 中的每个代码移动到 celery.py 并再次检查
  • 此处显示的代码有效,我无法实现的是将 News 模型导入 tasks.py @HemanthSP
  • 您的错误来自 django 或 celery 终端?

标签: python django celery python-import


【解决方案1】:

试试这样的。它在 3.1 celery 中工作,导入应该发生在 save 方法内部和 super() 之后

from django.db import models



class News(models.model):
    (...)

    def save(self, *args, **kwargs):
        (...)
        super(News, self).save(*args, **kwargs)
        from task import update_news_status
        update_news_status.apply_async((self.id,)) #apply_async or delay

【讨论】:

  • 嗨,问题不在models.py上,而是在tasks.py上,我需要导入News模型
  • 我只知道会出现这样的错误,您尝试过我的解决方案吗?
  • 是的,但是我已经在 models.py 中调用了任务,那里没有问题。问题是相反的,从模型导入新闻里面 tasks.py 是不可能的,所以我不能对任务做任何事情
  • 我在我的 tasks.py 中导入了很多模型,它对我来说就像一个魅力
【解决方案2】:

好吧,对于任何为此苦苦挣扎的人......原来我的 celery.py 没有从设置中读取环境变量。

经过一周的大量研究后,我意识到 Celery 不是 Django 的进程,而是在它之外运行的进程(duh),所以当我尝试加载设置时,它们已加载但然后我无法访问我在 .env 中定义的 env 变量(我使用 dotenv 库)。 Celery 试图在我的 .bash_profile 中查找环境变量(当然)

所以最后我的解决方案是在定义我的celery.py 的同一目录中创建一个帮助模块,称为load_env.py,并带有以下内容

from os.path import dirname, join
import dotenv


def load_env():
    "Get the path to the .env file and load it."
    project_dir = dirname(dirname(__file__))
    dotenv.read_dotenv(join(project_dir, '.env'))

然后在我的 celery.py 上(注意最后的导入和第一条指令)

from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
from .load_env import load_env

load_env()

# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")

app = Celery('myapp')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.

app.config_from_object('myapp.settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

在调用load_env() env 变量被加载并且 celery worker 可以访问它们之后。通过这样做 我现在可以从我的 tasks.py 访问其他模块,这是我的主要问题。

感谢this guys (Caktus Consulting Group) 和他们的django-project-template,因为如果不是他们,我将找不到答案。谢谢。

【讨论】:

    【解决方案3】:

    这是我要做的(Django 1.11 和 celery 4.2),你的 celery 配置有问题,你尝试重新声明 Celery 实例:

    tasks.py

    from myapp.celery import app # would contain what you need :)
    from celery.utils.log import get_task_logger
    
    logger = get_task_logger(__name__)
    
    @app.task(name='news.tasks.update_news_status')
    def update_news_status(news_id):
        # (I pass the news id and return it, nothing complicated about it)
        return news_id
    

    celery.py

    from __future__ import absolute_import, unicode_literals
    from celery import Celery
    from django.conf import settings
    import os
    
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
    app = Celery('myapp', backend='rpc://', broker=BROKER_URL) # your config here
    app.config_from_object('django.myapp:settings', namespace='CELERY') # change here
    app.autodiscover_tasks()
    

    models.py

    from django.db import models
    
    class News(models.model):
        (...)
        def save(self, *args, **kwargs):
            super(News, self).save(*args, **kwargs)
            from news.tasks import update_news_status
            update_news_status.delay(self.id) # change here
    

    并使用celery -A myapp worker --loglevel=info 启动它,因为您的应用程序是在 myapp.celery 中定义的,因此 -A 参数需要是声明 conf 的应用程序

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-06-28
      • 2016-08-13
      • 1970-01-01
      • 2017-05-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多