【问题标题】:How to call a Celery shared_task?如何调用 Celery shared_task?
【发布时间】:2018-09-17 16:26:19
【问题描述】:

我正在尝试在我的应用程序中使用 stream_framework(不是 Django),但我在调用 stream_framework 共享任务时遇到了问题。 Celery 似乎找到了任务:

-------------- celery@M3800 v3.1.25 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-4.15.0-34-generic-x86_64-with-Ubuntu-18.04-bionic
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         task:0x7f8d22176dd8
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . formshare.processes.feeds.tasks.test_shared_task
  . stream_framework.tasks.fanout_operation
  . stream_framework.tasks.fanout_operation_hi_priority
  . stream_framework.tasks.fanout_operation_low_priority
  . stream_framework.tasks.follow_many
  . stream_framework.tasks.unfollow_many

[2018-09-17 10:06:28,240: INFO/MainProcess] Connected to redis://localhost:6379/0
[2018-09-17 10:06:28,246: INFO/MainProcess] mingle: searching for neighbors
[2018-09-17 10:06:29,251: INFO/MainProcess] mingle: all alone

我用以下方式运行 celery:

celery -A formshare.processes.feeds.celery_app worker --loglevel=info

我的 celery_app 有:

from celery import Celery

celeryApp = Celery('task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0', include='formshare.processes.feeds.tasks')

问题是 delay() 不运行共享任务。我还在我的应用程序中创建了一个共享任务,但是当我调用 delay() 时,也不会调用该任务。我想我需要将它们注册为可从我的应用程序中调用?我似乎在网上找不到任何信息。

我也尝试自动发现任务,但遇到了同样的问题:

celeryApp.autodiscover_tasks(['stream_framework', 'formshare.processes.feeds'],force=True)

高度赞赏任何想法。

【问题讨论】:

  • 经过数小时的反复试验,我注意到如果我使用 Gunicorn 启动我的应用程序会失败,但如果我使用 Waitress 则可以。因此发布:stackoverflow.com/questions/52378871/…

标签: celery celery-task stream-framework


【解决方案1】:

共享任务是用于在不同应用程序之间实际共享任务的特定事物(我认为主要是 Django 应用程序,但我在烧瓶中使用它们)。

我们遇到了同样的问题,为了让它发挥作用,我们设置了

 celery_app.set_default()

关于芹菜的实例化

否则另一种解决问题的方法是通过应用程序本身实际调用任务,因此围绕这些行进行一些操作

from celery import current_app
.
.
.
current_app.tasks['my.tasks.to.exec'].delay(something)

这始终有效,因为它是一个共享任务,因此在您导入它时不会绑定到任何应用程序,在这种情况下,它属于配置为“current_app”的应用程序

【讨论】:

  • +1 set_default 解决方法对我有用。我很确定我们没有正确使用 Celery,但是在花了两天时间在网上搜索以找出正确的方法之后,这真的很方便,我现在会坚持下去。
猜你喜欢
  • 2021-04-01
  • 2019-10-17
  • 2021-03-15
  • 1970-01-01
  • 1970-01-01
  • 2016-03-31
  • 2020-08-25
  • 2017-09-28
  • 2017-08-07
相关资源
最近更新 更多