【问题标题】:Asynchronous celery task calls in django not getting pushed into rabbitmq queuedjango中的异步芹菜任务调用没有被推入rabbitmq队列
【发布时间】:2012-11-24 03:39:46
【问题描述】:

我的系统中安装了 celery==3.0.12 和 djcelery==3.0.11,django 版本为 1.4.1。我试图在我的一个项目中使用 celery 异步处理一些任务,但它不起作用。因此,为了进行测试,我开始了一个新的 django 项目,定义了示例任务 add 并像这样从 shell 调用它

 >>>res = add.delay(3, 5)

我试过res.statusres.get()res.ready(),所有这些都被阻止了。我正在使用 rabbitmq 管理插件在浏览器中监视 rabbitmq celery 队列。 celery 队列处于空闲状态,没有收到任何消息。

下面是目录树。

|-- manage.py
|-- new_app
|   |-- __init__.py
|   |-- __init__.pyc
|   |-- models.py
|   |-- models.pyc
|   |-- tasks.py
|   |-- tasks.pyc
|   |-- tests.py
|   `-- views.py
`-- testapp
    |-- __init__.py
    |-- __init__.pyc
    |-- settings.py
    |-- settings.pyc
    |-- urls.py
    `-- wsgi.py

以下是文件内容

new_app/tasks.py

from celery import task

@task
def add(x ,y):
    return x + y

testapp/settings.py

import djcelery
djcelery.setup_loader()

BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

BROKER_HOST = 'localhost'
BROKER_PORT = 5672
BROKER_USER = 'guest'
BROKER_PASSWORD = 'guest'
BROKER_VHOST = '/'

当我运行 python manage.py celeryd -l INFO 时,它正在创建 celery 队列。 以下是控制台输出。 http://dpaste.com/841960/

RabbitMQ 版本为 3.0.0

rabbitmqctl list_queues的输出

Listing queues ...
celery  0
h4ckb0x.celery.pidbox   0
...done.

【问题讨论】:

  • 你能运行rabbitmqctl list_queues 看看是否真的在Rabbit 中创建了队列和消息吗?
  • 用它的输出更新了问题。
  • 可能是您正在运行一个旧的损坏的工作进程,请尝试ps auxww | grep celeryd | awk '{print $2}' | xargs kill -9。您可以使用rabbitmqctl list_queues name messages consumers 查看是否有任何消费者从队列中消费。如果您已经拥有BROKER_URL,那么这些BROKER_* 设置也是不必要的。

标签: python django rabbitmq celery


【解决方案1】:

尝试将您的操作与以下内容进行比较,并找出您的应用中缺少的内容:

配置兔子:

首先安装它(在我们的例子中是 ubuntu):

sudo apt-get update
sudo apt-get install rabbitmq-server
sudo mkdir /etc/rabbitmq/rabbitmq.conf.d

设置用户/psw/vhost:

sudo rabbitmqctl delete_user guest
sudo rabbitmqctl add_user <username> <password>
sudo rabbitmqctl set_user_tags <username> administrator
sudo rabbitmqctl add_vhost <some_name_for_vhost>
sudo rabbitmqctl set_permissions -p <some_name_for_vhost> <username> ".*" ".*" ".*"')

定义设置:

settings.py

​​>
RABBIT_USERNAME = <username>
RABBIT_PASSWORD = <password>
RABBIT_HOST = 'localhost' #or some server dns/ip
RABBIT_VHOST = <some_name_for_vhost>
BROKER_URL = 'amqp://%s:%s@%s:5672/%s' % (RABBIT_USERNAME,RABBIT_PASSWORD,RABBIT_HOST,RABBIT_VHOST)

CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
CELERY_CREATE_MISSING_QUEUES = True

定义芹菜:

celery.py

​​>
from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'yourapp.settings')

app = Celery('yourapp')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

定义任务:

tasks.py

​​>
from celery import task

@task(queue="my_queue")
def do_something_in_background(x1,x2):   
    #start doing something in the task
    #enter you code in here

运行工人:

(很好地杀死所有现有的,运行 10 个名为 app_worker 的工作人员,将它们连接到您的任务队列:my_queue)

run_workers.sh

#!/bin/bash
ps auxww | grep 'yourapp worker' | awk '{print $2}' | xargs kill
celery -A yourapp worker -Q my_queue -n app_worker -l info -c 10 -Ofair

定义一些测试:

test.py

​​>
import tasks
def my_func():
    tasks.do_something_in_background.delay(x1,x2)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-28
    • 1970-01-01
    • 1970-01-01
    • 2019-03-12
    相关资源
    最近更新 更多