【问题标题】:Celery subtask bound to different application executed remotelyCelery 子任务绑定到远程执行的不同应用程序
【发布时间】:2014-02-07 20:05:14
【问题描述】:

我有两个配置不同的 Celery 应用程序实例,比如说 app1 和 app2。来自 app1 的任务在远程 worker1 上执行。来自 app2 的任务在远程 worker2 上执行。

我正在尝试使用 app2 中的子任务执行 app1 中的任务,但子任务没有得到执行。

我的代码是这样的:

import app1.tasks as app1_tasks
import app2.tasks as app2_tasks

reference = 'foo'

success_callback = app2_tasks.bar.subtask(reference)

app1_tasks.foo.apply_async(
    args=(
        1, 2, 3, reference,
    ),
    link= success_callback,
)

有什么方法可以让这个工作吗?我需要有两个不同的 Celery 应用程序。

回调任务没有运行。我想这是因为它使用了不同的应用程序实例?

重要提示:这两个任务都在不同的工作人员上远程执行!这就是我有两个应用程序实例的原因之一。

【问题讨论】:

  • 应用未序列化。子任务是一个签名对象,它描述了任务的参数和执行选项,当发送应用程序实例时,它会被删除,因为这是一个很大的复杂结构,序列化成本很高(例如,肯定不支持 json) .工作人员将收到子任务并使用启动它的应用程序。所以:应用程序是本地的,它们仅定义 celery 在该过程中的工作方式。
  • 注意:子任务将被发送,但可能与您预期的路由设置不同。如果 app2 定义了与 app1 不同的 CELERY_ROUTES,并且 worker 使用 app1 启动,那么 app2 中的路由不会对 worker 产生任何影响。我想如果子任务提前继承路由,这是一个可以解决的问题(但这也可能与其他人期望它的工作方式发生冲突)

标签: python celery


【解决方案1】:

您可以将应用分开并让它们通过以下方式进行通信:

1.让每个工作人员监听不同的队列

$ celery worker -Q feeds

2. 使用 routing method 向特定应用发送消息,来自文档的手动示例:

>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
...                         queue='feed_tasks',
...                         routing_key='feed.import')

3.call task by name 并保持应用代码分离,形成文档:

from celery.execute import send_task
send_task("tasks.add", args=[2, 2], kwargs={})
<AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>

【讨论】:

  • 如果是subtasks,我该如何使用send_taskapp.send_task('myapp.send_push_notification', (json.dumps(payload1), ), link=app.send_task('differentapp.save_pn_response', (json.dumps(payload2), )))
猜你喜欢
  • 2016-09-23
  • 1970-01-01
  • 2017-04-14
  • 2012-08-09
  • 1970-01-01
  • 1970-01-01
  • 2019-08-03
  • 2020-07-01
  • 2020-06-05
相关资源
最近更新 更多