【问题标题】:How to make a celery task call asynchronous tasks?如何让 celery 任务调用异步任务?
【发布时间】:2015-12-02 16:25:27
【问题描述】:

我有一个需要运行优化算法的 Django 应用程序。该算法由两部分组成。第一部分是进化算法,该算法调用了第二部分的一定数量的任务,即模拟退火算法。 问题是 celery 不允许任务调用异步任务。 我在下面尝试过这段代码:

            sa_list = []
            for cromossomo in self._populacao:
                sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))

            job = group(sa_list)

            result = job.apply_async()
            resultados = result.get()

这段代码是进化算法的一部分,它是一个芹菜任务。 当我尝试运行它时,芹菜显示此消息:

[2015-12-02 16:20:15,970:警告/Worker-1] /home/arthur/django-user/local/lib/python2.7/site-packages/celery/result.py:45: RuntimeWarning:永远不要在任务中调用 result.get()! 见http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

在 Celery 3.2 中,这将导致异常 提出而不仅仅是一个警告。

尽管只是一个警告,芹菜似乎充满了任务和锁。

我搜索了很多解决方案,但都没有奏效。

【问题讨论】:

    标签: python asynchronous optimization celery django-celery


    【解决方案1】:

    解决此问题的一种方法是使用 2 阶段管道:

    def first_task():
        sa_list = []
        for cromossomo in self._populacao:
            sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))
    
        job = group(sa_list)
    
        result = job.apply_async()
        result.save()
        return result.id
    

    然后这样称呼它:

    from path.to.tasks import app, first_task
    
    result_1 = first_task.apply_async()
    result_2_id = result_1.get()
    result_2 = app.GroupResult.restore(result_2_id)
    resultados = result_2.get()
    

    还有其他方法可以做到这一点,需要更多的工作 - 您可以使用和弦来收集小组的结果。

    【讨论】:

    • 但是我在 first_task 中有一个循环,所以我不能分成两步管道。
    • 抱歉,这对我来说没有意义——你能用代码说明一下吗?
    【解决方案2】:

    问题不在于 celery 在您的示例中不允许执行异步任务,而是您会遇到死锁,因此会出现警告:

    假设您有一个任务 A,它通过apply_async() 产生了许多子任务 B。这些任务中的每一项都由工人执行。问题是,如果任务 B 的数量大于可用工作人员的数量,则任务 A 仍在等待他们的结果(至少在您的示例中 - 默认情况下不是)。当任务 A 仍在运行时,已执行任务 B 的工作人员将不会执行另一个任务,它们会被阻塞,直到任务 A 完成。 (我不知道具体原因,但几周前我遇到了这个问题。)

    这意味着 celery 无法执行任何操作,除非您手动关闭 worker。

    解决方案

    这完全取决于您将如何处理任务结果。如果您需要它们执行下一个子任务,您可以通过 Linking with callbacks 将它们链接起来,或者将其硬编码到相应的任务中(以便您调用第一个,调用第二个,依此类推)。

    如果您只需要查看它们是否执行以及是否成功,您可以使用flower 来监控您的任务。

    如果您需要进一步处理所有子任务的输出,我建议将结果写入 xml 文件:让任务 A 调用所有任务 B,一旦完成,您就执行处理结果的任务 C。也许有更优雅的解决方案,但这肯定会避免死锁。

    【讨论】:

      猜你喜欢
      • 2020-05-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-11-13
      • 2014-05-04
      • 2015-11-22
      • 2011-11-22
      • 1970-01-01
      相关资源
      最近更新 更多