【问题标题】:Correct setup of django redis celery and celery beatsdjango redis celery 和 celery beats 的正确设置
【发布时间】:2018-07-13 21:50:45
【问题描述】:

我一直在尝试设置 django + celery + redis + celery_beats,但这给我带来了麻烦。文档非常简单,但是当我运行 django 服务器、redis、celery 和 celery beats 时,没有任何东西被打印或记录(我所有的测试任务都会记录一些东西)。

这是我的文件夹结构:

- aenima 
 - aenima
   - __init__.py
   - celery.py

 - criptoball
   - tasks.py

celery.py 看起来像这样:

from __future__ import absolute_import, unicode_literals
import os
from django.conf import settings
from celery import Celery


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'aenima.settings')

app = Celery("criptoball")
app.conf.broker_url = 'redis://localhost:6379/0'

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.timezone = 'UTC'

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

app.conf.beat_schedule = {
    'test-every-30-seconds': {
        'task': 'tasks.test_celery',
        'schedule': 30.0,
        'args': (16, 16)
    }, }

task.py 看起来像这样:

from __future__ import absolute_import, unicode_literals
from datetime import datetime, timedelta
from celery import shared_task
import logging

from django_celery_beat.models import PeriodicTask, IntervalSchedule

cada_10_seg = IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)

test_celery_periodic = PeriodicTask.objects.create(interval=cada_10_seg, name='test_celery', task='criptoball.tasks.test_celery',
expires=datetime.utcnow()+timedelta(seconds=30))

@shared_task
def test_celery(x, y):
    logger = logging.getLogger("AENIMA")
    print("EUREKA")
    logger.debug("EUREKA")

这是django_celery_beat 文档

不知道我错过了什么。当我跑步时

celery -A aenima beat -l debug --scheduler django_celery_beat.scheduler:DatabaseScheduler

celery -A aenima worker -l 调试

redis-cli ping 乒乓

django runserver 和 redis 服务器,我什么也没打印。

settings.py

CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE
CELERY_IMPORTS = ('criptoball.tasks',)

到目前为止,还没有找到任何关于这个话题的权威答案。

我想解决这一切,这个错误可能只是众多错误之一。非常感谢您的帮助!

编辑:

为 redis 添加了设置,以不同的方式声明了任务并提高了调试级别。现在的错误是:

收到 u'tasks.test_celery' 类型的未注册任务。讯息 已被忽略并丢弃。

您还记得导入包含此任务的模块吗?或许 您正在使用相对进口? KeyError: u'aenima.criptoball.tasks.test_celery'

我认为 Celery 的文档很差。

编辑 2 在尝试了一切之后,当我将所有任务放在同一个 celery.py 文件中时,它就起作用了。 @shared_task 不起作用,必须使用 @app.task 。

【问题讨论】:

  • 你有一个芹菜工人在跑步吗?例如。从命令行,celery worker -A <your_module_name>。东西应该打印在你开始的终端上。
  • @Chris 更新了答案,增加了赏金。
  • 看起来 celery -A aenima beat celery -A aenima worker -l info 可能是转录/格式错误。你的意思是把它们放在两行吗?
  • @sytech 是的。感谢您的评论,已更正。
  • 我建议你使用 django-q。不是你的问题的答案,但我能感觉到你的痛苦。我用了一段时间芹菜,然后我放弃了。问题太多,当你解决了一个问题时,你还有另一个问题要调试。使用 django-q 要容易得多。祝你好运

标签: python django celery celerybeat


【解决方案1】:

我以前遇到过这些问题。这不是你的代码。这通常是环境的问题。 您应该运行virtualenv 下的所有内容,添加一个带有特定包版本requirements.txt 文件。

关于celery 4.xdjango 1.x 存在一个已知问题,因此您应该考虑您正在使用的软件包。

This 教程将详细解释如何使用 celery 构建virtualenv

如果您能告诉我您的软件包版本,我可能会尝试以不同的方式提供帮助。

编辑:

我认为这与您运行芹菜的方式有关。如果我们解决了第一个问题,试试这个:

celery -A aenima.celery:app beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler

celery -A aenima.aenima.celery:app beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler

您遇到的最新错误与您的模块发现有关。 先试试吧。

【讨论】:

  • 我正在使用 virtualenv。我有 django 1.10 和 celery 4.1 。我应该升级 django 吗?
  • 你应该将 celery 降级到 3.x。我在我的安装中做到了这一点,并且效果很好。
  • 我可以在文档中添加一个注释,它写成Celery 4.0 supports Django 1.8 and newer versions. Please use Celery 3.1 for versions older than Django 1.8.吗?所以 4.x 应该能够与 django 1.10 一起工作,如果有什么我建议将 django 升级到更高版本:)
  • 你说得对,但事实是它仍然存在问题。我认为我们仍然同意问题出在版本上。如果是我,我宁愿降级 celery 升级 django。如果你的项目很大,升级 django 可能会很头疼……
  • 是的,当您尝试升级 Django 时,它会带来很多麻烦,但是 Django 的版本最近增长得相当快,所以最终无论哪种方式都需要升级,但是是的。我同意这也可能是版本之间的差异。 :)
【解决方案2】:

使用virtualenv 会很方便。

首先就像@Gal 说你需要确保你有celery 4.x

你可以通过pip来安装它:

pip 安装芹菜

当然您也可以安装4.x 版本,将其添加到您的requirements.txt 中,如下所示:

芹菜==4.1.0

或更高版本(如果将来可用)。

然后您可以使用以下命令重新安装所有软件包:

  • pip install -r requirements.txt

这将确保您安装了特定的 celery 包。

现在是 Celery 部分,虽然你的代码可能没有错,但我会写下我如何让我的 Celery 应用程序工作。

__init __.py:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery_conf import app as celery_app

__all__ = ['celery_app']

celery_conf.py:

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery
from datetime import timedelta

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<PATH.TO.YOUR.SETTINGS>')

app = Celery('tasks')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# Set a beat schedule to update every hour.
app.conf.beat_schedule = {
    'update-every-hour': {
        'task': 'tasks.update',
        'schedule': timedelta(minutes=60),
        'args': (16, 16),
    },
}

# The default task that Celery runs.
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

tasks.py:

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import requests

from django.conf import settings
from django.http import HttpResponse

from celery.task import Task
from celery.five import python_2_unicode_compatible
from celery import Celery
app = Celery()


@python_2_unicode_compatible
class Update(Task):
    name = 'tasks.update'

    def run(self, *args, **kwargs):
        # Run the task you want to do.

""" For me the regular TaskRegistry didn't work to register classes, 
so I found this handy TaskRegistry demo and made use of it to 
register tasks as classes."""
class TaskRegistry(Task):

    def NotRegistered_str(self):
        self.assertTrue(repr(TaskRegistry.NotRegistered('tasks.add')))

    def assertRegisterUnregisterCls(self, r, task):
        with self.assertRaises(r.NotRegistered):
            r.unregister(task)
        r.register(task)
        self.assertIn(task.name, r)

    def assertRegisterUnregisterFunc(self, r, task, task_name):
        with self.assertRaises(r.NotRegistered):
            r.unregister(task_name)
        r.register(task, task_name)
        self.assertIn(task_name, r)

    def task_registry(self):
        r = TaskRegistry()
        self.assertIsInstance(r, dict, 'TaskRegistry is mapping')

        self.assertRegisterUnregisterCls(r, Update)

        r.register(Update)
        r.unregister(Update.name)
        self.assertNotIn(Update, r)
        r.register(Update)

        tasks = dict(r)
        self.assertIsInstance(
            tasks.get(Update.name), Update)

        self.assertIsInstance(
            r[Update.name], Update)

        r.unregister(Update)
        self.assertNotIn(Update.name, r)

        self.assertTrue(Update().run())

    def compat(self):
        r = TaskRegistry()
        r.regular()
        r.periodic()

正如我在代码中所解释的那样,常规的taskregistry 在 Celery 4.x 中内置的不起作用,所以我使用了演示任务注册表。 当然你也可以不使用类来做任务,但我更喜欢使用类。

settings.py:

# Broker settings for redis
CELERY_BROKER_HOST = '<YOUR_HOST>'
CELERY_BROKER_PORT = 6379
CELERY_BROKER_URL = 'redis://'
CELERY_DEFAULT_QUEUE = 'default'

# Celery routes
CELERY_IMPORTS = (
    'PATH.TO.tasks' # The path to your tasks.py
)

CELERY_DATABASE_URL = {
    'default': '<CELERY_DATABASE>', # You can also use your already being used database here
}

INSTALLED_APPS = [
    ...
    'PATH.TO.TASKS' # But exclude the tasks.py from this path
]

LOGGING = {
    ...
    'loggers': {
        'celery': {
            'level': 'DEBUG',
            'handlers': ['console'],
            'propagate': True,
        },
    }
}

我使用以下命令启动我的工人:

redis-server --daemonize 是的

celery multi start worker -A PATH.TO.TASKS -l info --beat #但排除tasks.py路径

我希望这些信息可以帮助您或任何在 Celery 上苦苦挣扎的人。

编辑:

请注意,我将工作程序作为守护程序启动,因此您实际上无法在控制台中看到日志。 对我来说,它记录在 .txt 文件中。

另外还要注意要使用的路径,例如对于某些您需要包含应用程序的路径,如下所示:

project.apps.app

对于其他情况,您还需要包含不带.py 的tasks.py,我写下了何时排除该文件以及何时不排除。

编辑 2:

@shared_task 装饰器返回一个始终使用 current_app 中的任务实例的代理。 这使得 @shared_task 装饰器对库和可重用应用程序很有用,因为它们无法访问用户的应用程序。

请注意@shared_task 无权访问用户的应用程序。 您当前尝试注册的应用无权访问您的应用。 您实际要用于注册任务的方法是:

from celery import Celery
app = Celery()

@app.task
def test_celery(x, y):
    logger = logging.getLogger("AENIMA")
    print("EUREKA")
    logger.debug("EUREKA")

【讨论】:

  • 感谢您的回答...更改了我的代码,您认为它必须打印类似的内容吗?我应该如何知道测试是否正在执行?
  • @alejoss 如果你想在控制台中看到一些东西,那么你首先需要运行 redis 作为守护进程 redis-server --daemonize yes 然后 celery worker 不是作为守护进程 celery -A aenima -l info --beat,你还必须启用celery logger 在你的 django 设置中,你必须查找如何设置 celery logger。
  • 添加了一个编辑,我想我更接近让它工作。出现了一个新的错误。如果可以的话,请看一下。
  • @alejoss 我给了你如何注册任务的完整设置,你的任务只是没有注册,应该很容易解决谷歌搜索?
  • 请不要推荐 Celery 4.1——它有一些错误会阻止周期任务正常运行。
【解决方案3】:

收到 u'tasks.test_celery' 类型的未注册任务。该消息已被忽略并丢弃。

您还记得导入包含此任务的模块吗?或者您可能正在使用相对导入?

可能你的任务路径不正确,应该是:

app.conf.beat_schedule = {
    'test-every-30-seconds': {
        'task': 'criptoball.tasks.test_celery',
        'schedule': 30.0,
        'args': (16, 16)
    }, 
}

tasks.test_celery应该是完整路径:criptoball.tasks.test_celery

【讨论】:

    【解决方案4】:

    你应该修复一件事,使用:

    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    

    告诉 Celery 如果您使用的是 Celery 3.x,您希望它发现哪些应用程序的任务。

    【讨论】:

      猜你喜欢
      • 2018-02-04
      • 2020-11-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-08-02
      • 2013-07-01
      • 2019-05-02
      • 2016-06-09
      相关资源
      最近更新 更多