【问题标题】:Celery task that runs more tasks运行更多任务的 Celery 任务
【发布时间】:2011-09-15 00:23:20
【问题描述】:

我正在使用 celerybeat 启动一项主要任务,该任务启动了许多次要任务。我已经写好了这两个任务。

有没有办法轻松做到这一点? Celery 是否允许在任务中运行任务?

我的例子:

@task
def compute(users=None):
    if users is None:
        users = User.objects.all()

    tasks = []
    for user in users:
        tasks.append(compute_for_user.subtask((user.id,)))

    job = TaskSet(tasks)
    job.apply_async() # raises a IOError: Socket closed

@task
def compute_for_user(user_id):
    #do some stuff

compute 被 celerybeat 调用,但在尝试运行 apply_async 时会导致 IOError。有什么想法吗?

【问题讨论】:

标签: python django task celery


【解决方案1】:

回答您的开场问题:从 2.0 版开始,Celery 提供了一种从其他任务开始任务的简单方法。您所说的“次要任务”就是所谓的“子任务”。请参阅Sets of tasks, Subtasks and Callbacks 的文档,@Paperino 非常友好地提供了链接。

对于 3.0 版,Celery 更改为使用 groups 来处理这种和其他类型的行为。

您的代码表明您已经熟悉此界面。您的实际问题似乎是,“当我尝试运行我的一组子任务时,为什么我得到一个'套接字关闭'IOError?”我认为没有人可以回答这个问题,因为您没有提供有关您的程序的足够信息。您的摘录不能按原样运行,因此我们无法自行检查您遇到的问题。请发布IOError 提供的堆栈跟踪,如果运气好的话,可以帮助您解决崩溃问题的人会出现。

【讨论】:

    【解决方案2】:

    由于 3.0 版“任务集”不再是术语......组、链和和弦作为一种特殊类型的子任务是新事物,请参阅 http://docs.celeryproject.org/en/3.1/whatsnew-3.0.html#group-chord-chain-are-now-subtasks

    【讨论】:

      【解决方案3】:

      你可以使用这样的东西(在 3.0 中支持)

      g = group(compute_for_user.s(user.id) for user in users)
      g.apply_async()
      

      【讨论】:

      • 所以在你的实现中根本不需要“compute(users=None):”方法,是吗?
      • 如果您想等待子任务完成并返回结果,不建议这样做。
      【解决方案4】:

      对于提到的IOError,虽然这里的信息不足以说明是什么原因造成的,但我的猜测是你试图在任务函数内部建立一个连接,所以每当调用一个任务时,都会建立一个新的连接。如果任务被调用千次,就会有千次连接。这将淹没系统套接字管理器并且 IOError 是它的抱怨。

      【讨论】:

        猜你喜欢
        • 2017-02-02
        • 2018-02-10
        • 2015-10-24
        • 1970-01-01
        • 2019-02-16
        • 2014-03-17
        • 1970-01-01
        • 2023-03-24
        • 2011-05-05
        相关资源
        最近更新 更多