【问题标题】:Python django rabbitmq celery problems with importing tasksPython django rabbitmq celery 导入任务的问题
【发布时间】:2015-04-22 00:56:59
【问题描述】:

欢迎...我正在创建一个项目,我在其中使用 xlrd 库解析 xlsx 文件。一切正常。然后我配置了 RabbitMQ 和 Celery。在主文件夹中创建了一些可以从 iPython 访问的任务。当我在我的应用程序中(在我的项目中及时创建的应用程序)并且我尝试在我的views.py中从我的应用程序中导入任务时,问题就开始了 我尝试使用所有可能的路径导入它,但每次它都会引发错误。 官方文档发布了从其他应用程序导入任务的正确方法,看起来是这样的: from project.myapp.tasks import mytask 但它根本不起作用。 此外,当我在 iPython 中时,我可以使用命令 from tango.tasks import add 导入任务 而且效果很好。

就在下面,我正在上传我的文件,控制台打印出错误。

views.py

# these are the instances that I was trying to import that seemed to be the most reasonable, but non of it worked
# import tasks
# from new_tango_project.tango.tasks import add
# from new_tango_project.tango import tasks
# from new_tango_project.new_tango_project.tango.tasks import add
# from new_tango_project.new_tango_project.tango import tasks
# from tango import tasks

#function to parse files
def parse_file(request, file_id):
    xlrd_file = get_object_or_404(xlrdFile, pk = file_id)
    if xlrd_file.status == False
        #this is some basic task that I want to enter to
        tasks.add.delay(321,123)

settings.py

#I've just posted things directly connected to celery
import djcelery
INSTALLED_APPS = (
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'tango',
    'djcelery',
    'celery',
)

BROKER_URL = "amqp://sebrabbit:seb@localhost:5672/myvhost"
BROKER_HOST = "127.0.0.1"
BROKER_PORT = 5672
BROKER_VHOST = "myvhost"
BROKER_USER = "sebrabbit"
BROKER_PASSWORD = "seb"
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Warsaw'
CELERY_ENABLE_UTC = False

celery.py(在我的主文件夹new_tango_project

from __future__ import absolute_import
import os
from celery import Celery
import djcelery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'new_tango_project.settings')

app = Celery('new_tango_project')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

# CELERY_IMPORTS = ['tango.tasks']
# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
    CELERY_RESULT_BACKEND='djcelery.backends.cache:CacheBackend',
)

if __name__ == '__main__':
    app.start()

tasks.py(在我的主项目文件夹new_tango_project

from __future__ import absolute_import
from celery import Celery
from celery.task import task

app = Celery('new_tango_project',
             broker='amqp://sebrabbit:seb@localhost:5672/myvhost',
             backend='amqp://',
             include=['tasks'])

@task
def add(x, y):
    return x + y


@task
def mul(x, y):
    return x * y


@task
def xsum(numbers):
    return sum(numbers)

@task
def parse(file_id, xlrd_file):

    return "HAHAHAHHHAHHA"

tasks.py 在我的应用程序文件夹中

from __future__ import absolute_import
from celery import Celery
from celery.task import task    
#
app = Celery('tango')

@task
def add(x, y):
    return x + y

@task
def asdasdasd(x, y):
    return x + y

celery 控制台启动时

 -------------- celery@debian v3.1.17 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.2.0-4-amd64-x86_64-with-debian-7.8
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         new_tango_project:0x1b746d0
- ** ---------- .> transport:   amqp://sebrabbit:**@localhost:5672/myvhost
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

最后是我的控制台日志...

[2015-02-20 11:19:45,678: ERROR/MainProcess] Received unregistered task of type 'new_tango_project.tasks.add'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.

The full contents of the message body was:
{'utc': True, 'chord': None, 'args': (123123123, 123213213), 'retries': 0, 'expires': None, 'task': 'new_tango_project.tasks.add', 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': None, 'kwargs': {}, 'eta': None, 'id': 'd9a8e560-1cd0-491d-a132-10345a04f391'} (233b)
Traceback (most recent call last):
  File "/home/seb/PycharmProjects/tango/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 455, in on_task_received
    strategies[name](message, body,
KeyError: 'new_tango_project.tasks.add'

这是导入任务的多次尝试之一的日志。

我在哪里犯错了?

最好的祝福

【问题讨论】:

    标签: python django import celery


    【解决方案1】:

    提示 1:在所有 tasks.py 中,您将 Celery 应用声明为 app = Celery(...),但您没有在任务装饰器中指定任务应附加到哪个应用。

    尝试将您的@task 更改为@app.task,看看是否有效。

    提示 2: 为什么需要在每个 tasks.py 中创建一个新的 Celery 应用程序?为什么不直接用from new_tango_project.celery import app 导入一个主要的Celery 应用程序,然后用@app.task 声明你的任务?

    提示 3:一旦定义了任务(可能在应用程序中的 celery.pytasks.py 中),就可以了

    from new_tango_project.celery import add
    from my_app.tasks import add_bis
    
    def my_view(request):
        ...
        add.delay(*your_params)   # using the task from your celery.py
        add_bis.delay(*your_params) # your task from the application
    

    【讨论】:

    • 不幸的是,这并没有帮助。
    • 我找到了解决方案。我从我应该的不同位置运行芹菜。但是现在当我尝试从主位置运行它时,该文件会看到所有文件中的所有任务,但是当我尝试像 add.delay() 那样运行它们时,它们没有执行,flower 或 celery 看不到结果。
    【解决方案2】:

    我想知道你是如何启动你的 celery worker 的。我遇到过一次,因为我没有正确启动 worker:你应该在执行“celery worker -l info”时添加 -A 选项,以便 celery 连接到你在 Celery Obj 中配置的代理。否则 celery 将尝试连接默认代理。

    【讨论】:

      猜你喜欢
      • 2011-07-18
      • 2011-07-17
      • 1970-01-01
      • 2018-09-20
      • 1970-01-01
      • 2014-01-18
      • 2015-08-08
      • 1970-01-01
      • 2014-12-23
      相关资源
      最近更新 更多