【问题标题】:Concurrent asynchronous processes with Python, Flask and Celery使用 Python、Flask 和 Celery 的并发异步进程
【发布时间】:2013-01-29 17:06:55
【问题描述】:

我正在开发一个小型但计算密集型的 Python 应用程序。计算密集型工作可以分成几个可以同时执行的部分。我正在尝试确定一个合适的堆栈来完成此操作。

目前我计划在 Apache2+WSGI 上使用 Flask 应用程序和 Celery 作为任务队列。

在下面,如果有 3 个或更多工作人员可用,a_long_process()another_long_process()yet_another_long_process() 会同时执行吗?进程执行时 Flask 应用会被阻塞吗?

来自 Flask 应用程序:

@myapp.route('/foo')
def bar():
    task_1 = a_long_process.delay(x, y)
    task_1_result = task_1.get(timeout=1)
    task_2 = another_long_process.delay(x, y)
    task_2_result = task_2.get(timeout=1)
    task_3 = yet_another_long_process.delay(x, y)
    task_3_result = task_3.get(timeout=1)
    return task_1 + task_2 + task_3

tasks.py:

from celery import Celery
celery = Celery('tasks', broker="amqp://guest@localhost//", backend="amqp://")
@celery.task
def a_long_process(x, y):
    return something
@celery.task
def another_long_process(x, y):
    return something_else
@celery.task
def yet_another_long_process(x, y):
    return a_third_thing

【问题讨论】:

    标签: python asynchronous flask celery


    【解决方案1】:

    您应该更改代码,以便工作人员可以并行工作:

    @myapp.route('/foo')
    def bar():
        # start tasks
        task_1 = a_long_process.delay(x, y)
        task_2 = another_long_process.delay(x, y)
        task_3 = yet_another_long_process.delay(x, y)
        # fetch results
        try:
            task_1_result = task_1.get(timeout=1)
            task_2_result = task_2.get(timeout=1)
            task_3_result = task_3.get(timeout=1)
        except TimeoutError:
            # Handle this or don't specify a timeout.
            raise
        # combine results
        return task_1 + task_2 + task_3
    

    此代码将阻塞,直到所有结果都可用(或达到超时)。

    在进程执行时会阻塞 Flask 应用程序吗?

    此代码只会阻止您的 WSGI 容器的一个工作人员。整个站点是否无响应取决于您使用的 WSGI 容器。 (例如 Apache + mod_wsgi、uWSGI、gunicorn 等)大多数 WSGI 容器会产生多个工作人员,因此在您的代码等待任务结果时只有一个工作人员会被阻塞。

    对于这种应用程序,我建议使用gevent,它为每个请求生成一个单独的greenlet,并且非常轻量级。

    【讨论】:

    • 将超时设置为 1 似乎真的很hacky,但似乎这是避免阻塞的唯一方法?现在我的浏览器似乎永远等到后台任务完成。 +1
    【解决方案2】:

    根据result.get() 的文档,它会等到结果准备好后再返回,所以通常它实际上是阻塞的。但是,由于您有 timeout=1,如果任务完成时间超过 1 秒,对 get() 的调用将引发 TimeoutError。

    默认情况下,Celery 工作线程以设置equal to the number of CPUs available 的并发级别开始。并发级别似乎决定了可用于处理任务的线程数。因此,在并发级别 >= 3 的情况下,Celery worker 似乎应该能够同时处理这么多任务(也许具有更高 Celery 专业知识的人可以验证这一点?)。

    【讨论】:

      【解决方案3】:

      使用 celery canvas 的Group 功能:

      组原语是一个签名,它采用应并行应用的任务列表。

      这是文档中提供的示例:

      from celery import group
      from proj.tasks import add
      
      g = group(add.s(2, 2), add.s(4, 4))
      res = g()
      res.get()
      

      哪个输出[4, 8]

      【讨论】:

      • 为什么还要使用 Celery,而您也使用 get?在这种情况下,这与在没有芹菜的情况下调用函数相同,我猜....我错过了什么?
      • 我不完全理解你的问题,但这里是:文档指出A group is lazy so you must call it to take action and evaluate the group.。这就是res = g() 正在做的事情——它调用组以便包含的任务运行(并行)。 resGroupResultget 返回每​​个包含任务的结果(文档有很好的例子)。
      • 也许我错了,但是当你调用 get 时它会阻塞,直到它有响应返回。那正确吗?如果是,为什么不直接调用函数而不使用 celery?这种情况下用芹菜有什么好处?
      • 优点是该组将并行运行所有包含的任务。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-05-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多