【问题标题】:Routing celery tasks路由 celery 任务
【发布时间】:2020-04-06 00:34:25
【问题描述】:

尝试创建两个单独的专用工作人员时,我无法向 celery 发送任务。我浏览了文档和this question,但并没有改善我的情况。

我的配置如下:

CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_ROUTES = {
    'books.tasks.resize_book_photo': {
        'queue': 'media',
        'routing_key': 'media',
    },
}

任务在tasks.py文件中定义如下:

import logging
import time

from celery import shared_task


from books.models import Author, Book
from books.commands import resize_book_photo as resize_book_photo_command


logger = logging.getLogger(__name__)


@shared_task
def list_test_books_per_author():
    time.sleep(5)
    queryset = Author.objects.all()
    for author in queryset:
        for book in author.testing_books:
            logger.info(book.title)


@shared_task
def resize_book_photo(book_id: int):
    resize_book_photo_command(Book.objects.get(id=book_id))

它们是使用apply_async 调用的:

list_test_books_per_author.apply_async()
resize_book_photo.apply_async((book.id,))

当我运行 celery flower 时,我发现队列中没有任务。

工人开始使用:

celery -A blacksheep worker -l info --autoscale=10,1 -Q media --host=media@%h
celery -A blacksheep worker -l info --autoscale=10,1 -Q default --host=default@%h

我可以做的是使用redis-cli127.0.0.1:6379> LRANGE celery 1 100 命令确认它们最终在celery 键下(这是芹菜的默认键)。似乎没有工人消费。

编辑 在仔细查看this part of documentation 之后,我注意到我的命名是错误的。将设置更改为:

CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'
CELERY_TASK_DEFAULT_QUEUE = 'default'
# CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_ROUTES = {
    'books.tasks.resize_book_photo': {
        'queue': 'media',
        'routing_key': 'media',
    },
}

情况有所改善:任务从default队列消耗,但我想去media队列的任务也去default

EDIT2 我试图通过将某个任务的调用更改为resize_book_photo.apply_async((book.id,), queue='media') 来明确告诉某个任务转到其他队列。该任务已正确分派到正确的队列并被使用。但是,我更喜欢自动处理这个问题,这样我就不必在每次调用apply_async时定义队列@

【问题讨论】:

  • 你试过改变这个 celery -A blacksheep worker -l info --autoscale=10,1 -Q default --host=media@%h for this celery -A blacksheep worker -l info - -autoscale=10,1 -Q 媒体 --host=media@%h ?参数 -Q 名称
  • 是的。这无济于事,因为问题在于将任务从后端路由到正确的队列,而不是在它们被代理保存后消耗它们。

标签: python django celery


【解决方案1】:

尝试使用CELERY_TASK_ROUTES 而不是CELERY_ROUTES。这对我最近使用 django 集成很有用。

解释隐藏在此评论中:How to route tasks to different queues with Celery and Django

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-04-22
    • 1970-01-01
    • 2019-01-08
    • 1970-01-01
    • 1970-01-01
    • 2017-08-12
    • 1970-01-01
    相关资源
    最近更新 更多