【问题标题】:start celery worker and enable it for broadcast queue启动 celery worker 并为广播队列启用它
【发布时间】:2017-10-23 19:53:14
【问题描述】:

我正在尝试启动 celery worker,所以它只听单个队列。这不是问题,我可以这样做:

python -m celery worker -A my_module -Q my_queue -c 1

但现在我也希望这个 my_queue 队列成为广播队列,所以我在 celeryconfig 中这样做:

from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('my_queue'),)

但是一旦我这样做了,我就不能再启动我的工人了,我从 rabbitmq 收到错误:

amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'myvhost': received 'fanout' but current is 'direct'

如果我在没有-Q 的情况下启动worker(但如上所述将Broadcast 留在celeryconfig.py 中)并列出rabbitmq 队列,我可以看到广播队列已创建并命名如下:

bcast.43fecba7-786a-461c-a322-620039b29b8b

同样,如果我在 worker 中定义这个队列(如上所述使用 -Q)或在 celeryconfig.py 中像这样简单的 Queue

from kombu import Queue
CELERY_QUEUES = (Queue('my_queue'),)

我可以像这样在 rabbitmq 中看到这个队列:

my_queue

在定义队列时,我在 Broadcast 调用中添加了什么并不重要 - 这似乎是内部 celery 名称,而不是传递给 rabbitmq。

所以我猜当工人开始时,然后my_queue 被创建,一旦完成它就不能被创建Broadcast

我可以让一个工作人员监听任何队列(不仅是 my_queue),我将从删除 -Q 参数开始。但是如果能够有一个进程只监听那个特定的队列就好了,因为我在那里投入的任务很快,而且我希望尽可能地降低延迟。

--- 编辑 1 --- 花了一些时间解决这个问题,似乎上面提到的bcast 队列并没有始终如一地出现。重置rabbitmq并在没有-Q选项的情况下运行celerybcast队列没有出现......

【问题讨论】:

  • 建议:将“my_queue”替换为新的广播队列名称,并使用“-Q new_broadcast_queue_name”启动您的工作进程,试一试。
  • 你用的是哪个 celery 版本?

标签: python rabbitmq celery


【解决方案1】:

当使用代理发送消息时,客户端和工作人员必须就相同的配置值达成一致。如果您必须更改配置,则需要清除现有消息并重新启动所有内容以使它们同步。

启动广播队列时,您可以设置交换类型并配置队列。

from kombu.common import Broadcast
from kombu import Exchange


exchange = Exchange('custom_exchange', type='fanout')

CELERY_QUEUES = (
    Broadcast(name='bcast', exchange=exchange),
)

现在你可以用

celery worker -l info -A tasks -Q bcast 

【讨论】:

  • 谢谢!我看过一些关于 custom_exchange 的建议,但不是这种形式。
  • 阿南德,这与你的回答有关。我配置了扇出队列,但似乎工作人员将任务从队列中取出,但随后他们随机不处理任务。当我检查 active_queues 时,我可以看到这些任务列在队列中,并且它们没有任何反应。你能帮忙吗?
  • @Greg0ry 我不确定工人是否随机不处理任务。你可以在调试模式下运行worker(-l debug)并查看日志吗?这可能有助于理解为什么会发生这种情况。
  • 谢谢阿南德,我设法让这个工作。我的问题特定于广播任务中发生的事情。在一台主机上,我错误地配置了我的 celery 服务以启动两个处理广播队列的工作人员,如果两个工作人员同时请求相同的资源,则任务逻辑将失败。再次感谢!
  • 我在迁移到 celery 4.x 后确实遇到了一些随机问题,所以我会在这里为可能有同样问题的其他人发表评论。首先,当启动 worker 时,您只指定进程数(即c:3 1)而没有队列号,那么在将它们发送到扇出队列时您会看到重复的任务。第二 - 记得每次更改队列时清除你的rabbitmq(即stop appresetstart app,这将删除你的rabbitmq状态),因为在大多数情况下由芹菜创建的队列(取决于配置)将是持久的.
猜你喜欢
  • 2014-12-24
  • 2018-11-03
  • 2018-12-13
  • 2019-07-18
  • 1970-01-01
  • 1970-01-01
  • 2020-04-20
  • 1970-01-01
  • 2017-11-29
相关资源
最近更新 更多