【发布时间】:2019-02-22 13:52:40
【问题描述】:
我有以下脚本:
celery_tasks.py
from celery import Celery
app = Celery(broker='amqp://guest:guest@localhost:5672//')
app.conf.task_default_queue = 'test_queue'
@app.task(acks_late=True)
def test(a):
return a
发布.py
from celery_tasks import test
test.delay('abc')
当我运行 publish.py 并启动 worker (celery -A celery_tasks worker --loglevel=DEBUG) 时,'abc' 内容会发布在 'test_queue' 中并被 worker 使用。
有没有办法让工作人员从不是由 Celery 发布的队列中消费一些东西?例如,当我直接通过 RabbitMQ 将某些内容放入 test_queue 中,而不通过 Celery 发布者,并运行 Celery 工作者时,它给了我以下警告:
WARNING/MainProcess] 收到并删除未知消息。目的地错误?!?
邮件正文的完整内容是:body: 'abc' (3b)
{content_type:无 content_encoding:无 delivery_info:{'exchange': '', 'redelivered': False, 'delivery_tag': 1, 'consumer_tag': 'None2', 'routing_key': 'test_queue'} headers={}}
有没有办法解决这个问题?
【问题讨论】:
-
你是如何手动将消息发布到rabbitmq的?从文档中查看,看起来 content_type 和 content_encoding 是必需的,并且可能还有其他必填字段。