【问题标题】:Cant get Celery/RabbitMQ to run my shared task in Django无法让 Celery/RabbitMQ 在 Django 中运行我的共享任务
【发布时间】:2019-08-02 21:51:42
【问题描述】:

我有一个已经部署并在生产中工作的应用程序,但这一切都是由其他人完成的。我现在正在尝试制作环境的本地版本,但我似乎无法让我的本地 Celery/RabbitMQ 实际运行该任务。

应用程序非常大,因此我不会尝试将其全部发布在这里,但我从调试中获得了一些可能有用的线索。一个是这个。当我运行以下功能时:

task_id = celery_send_playbook_msg_util.apply_async([brand_user.id, pb['id'], sequence_id, '', False, False, message_id,
                                                   pb['playbook'], event_type == constants.event_types['Abandoned']],
                                                  eta=delivery_datetime, queue='high_priority', priority=8)

print("Celery Task ID: " + str(task_id))

实际上,我确实得到了一个 UUID 样式的 task_id 作为回报。这向我表明 Celery Broker 正在运行。我还尝试了以下芹菜经纪人的配置选项(到目前为止都没有工作)

#BROKER_URL = 'amqp://test:test@192.168.33.10:5672//'
#BROKER_URL = 'amqp://test:test@localhost:5672//'
#BROKER_URL = 'amqp://test:test@localhost//'
BROKER_URL = 'amqp://test:test@192.168.33.10//'

其他线索:

我突然想到,查看我用来启动工作人员的命令的输出可能会有所帮助,所以这里是:

celery -A Python worker --loglevel=debug

 -------------- celery@vagrant v4.2.1 (windowlicker)
---- **** -----
--- * ***  * -- Linux-4.15.0-29-generic-x86_64-with-Ubuntu-18.04-bionic 2019-08-02 20:42:29
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         Python:0x7fa1367f0650
- ** ---------- .> transport:   amqp://test:**@192.168.33.10:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . Python.celery.debug_task
  . Python.celery.send_messages_daily_unreadcount
  . Sensus.tasks.bulk_manual_optin_from_csv_task
  . Sensus.tasks.celery_csv_upload_send_message
  . Sensus.tasks.celery_send_messages_daily_util
  . Sensus.tasks.celery_send_msg_util
  . Sensus.tasks.celery_send_payment_message
  . Sensus.tasks.celery_send_playbook_msg_util
  . Sensus.tasks.consolidate_messages_and_analyze_sentiment
  . Sensus.tasks.scheduled_broadcast_task
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap

[2019-08-02 20:42:29,590: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2017 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@vagrant.vm', 'platform': 'Erlang/OTP', 'version': '3.6.10'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: [u'en_US']
[2019-08-02 20:42:29,592: INFO/MainProcess] Connected to amqp://test:**@192.168.33.10:5672//
[2019-08-02 20:42:29,601: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2017 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@vagrant.vm', 'platform': 'Erlang/OTP', 'version': '3.6.10'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: [u'en_US']
[2019-08-02 20:42:29,603: INFO/MainProcess] mingle: searching for neighbors
[2019-08-02 20:42:29,604: DEBUG/MainProcess] using channel_id: 1
[2019-08-02 20:42:29,606: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:29,621: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2017 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@vagrant.vm', 'platform': 'Erlang/OTP', 'version': '3.6.10'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: [u'en_US']
[2019-08-02 20:42:29,623: DEBUG/MainProcess] using channel_id: 1
[2019-08-02 20:42:29,624: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:30,630: INFO/MainProcess] mingle: all alone
[2019-08-02 20:42:30,636: DEBUG/MainProcess] using channel_id: 2
[2019-08-02 20:42:30,637: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:30,641: DEBUG/MainProcess] using channel_id: 3
[2019-08-02 20:42:30,642: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:30,645: DEBUG/MainProcess] using channel_id: 1
[2019-08-02 20:42:30,646: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:30,649: WARNING/MainProcess] /home/vagrant/.local/lib/python2.7/site-packages/celery/fixups/django.py:200: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2019-08-02 20:42:30,650: INFO/MainProcess] celery@vagrant ready.
[2019-08-02 20:42:30,651: DEBUG/MainProcess] basic.qos: prefetch_count->4
[2019-08-02 20:42:50,649: DEBUG/MainProcess] heartbeat_tick : for connection bcbf34af62f3488c8bbcee3f18b42621
[2019-08-02 20:42:50,651: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: None/None, now - 28/58, monotonic - 11221.8028604, last_heartbeat_sent - 11221.8028469, heartbeat int. - 60 for connection bcbf34af62f3488c8bbcee3f18b42621
[2019-08-02 20:43:10,655: DEBUG/MainProcess] heartbeat_tick : for connection bcbf34af62f3488c8bbcee3f18b42621
[2019-08-02 20:43:10,656: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: 28/58, now - 28/88, monotonic - 11241.8086932, last_heartbeat_sent - 11221.8028469, heartbeat int. - 60 for connection bcbf34af62f3488c8bbcee3f18b42621

【问题讨论】:

  • 你能看到队列中的任务吗?
  • 我可以建议 - 如果您是 django/celery 的新手,请启动一个新项目。尝试让整个架构正常工作,尝试一些设置和配置。一旦你感到舒服,再次选择非常大的项目。有很多地方可能会出错,因此自己设置 celery 的配置可能是一种学习体验。
  • @skybunk 谢谢。那是一些人的建议,所以我尝试了它,但没有什么可学的。不过我最终还是弄明白了(我会在几分钟后发布答案......)

标签: rabbitmq django-celery


【解决方案1】:

所以我想通了。方法如下。

首先我使用以下命令查看队列中的内容:

sudo rabbitmqctl list_queues

这给了我以下输出:

Listing queues
d68c3a7d-ed35-3c79-b571-0d01ccda84ad    1
2753309c-9f03-399c-871d-5b4ffcbea462    0
high_priority   23
8ce8d7e0-0081-3937-80fb-ff238be8f410    1
4ce2ecce-6954-3c07-857a-4221fe613e72    0
celery  0
celery@vagrant.celery.pidbox    0
celeryev.1a7429e0-48b2-4ead-925c-42ee1855247d   0
8127f8e8-073c-3972-a563-829ab207b964    0

我很好奇“high_priority”旁边的 23 是什么,我注意到每次我尝试应该放入队列中的东西时它都会不断上升。事实证明,在我的应用程序中,当我们将某些内容放入队列时,我们不只是将其放入一般队列中,而是将其放入名为“high_priority”的队列中。因为我没有注意到这一点,所以我开始让我的工作人员查看一般队列。为了解决这个问题,我在 worker 调用中添加了一个 -Q 选项,如下所示:

celery -A Python worker --loglevel=debug -Q high_priority

这样就解决了问题

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-01-18
    • 2011-07-18
    • 2011-07-17
    • 2015-08-08
    • 2018-11-27
    • 1970-01-01
    • 1970-01-01
    • 2020-06-30
    相关资源
    最近更新 更多