【问题标题】:Python-Celery increase concurrency on workerPython-Celery 增加了 worker 的并发性
【发布时间】:2017-03-21 05:45:37
【问题描述】:

TL;DR

是否可以增加或减少 正在运行的 Celery worker 不重新启动它的并发性?

我在 ubuntu 14.10 上使用 celery 4.0.0 和 RabbitMQ 作为代理

我的用例

我经常面临一大堆任务,其中大多数主要执行 HTTP-Request 并进行一些次要处理。我让工人在一台相当强大的机器上运行,并希望最大限度地利用它的资源。大多数时候这不是问题,除非处理这些大量的 HTTP 请求,这可能会超时或需要很长时间才能响应等。在处理这些时,我想暂时增加 @987654322 @-parameter,实际上不必重新启动工作器。

目前我正在使用--concurrency 150 运行 celery,但这只会使服务器瓶颈(CPU)的利用率达到约 10%。我想一种解决方案是在该时间段内产生另一个 150 并发工作人员并稍后将其杀死,但这可能会增加复杂性。如果可能的话,我想坚持使用 1 名工人/机器。

【问题讨论】:

  • 自动缩放选项怎么样:docs.celeryproject.org/en/latest/userguide/…
  • 原则上我更喜欢使用自动缩放的想法,而不是我想出的。但是,文档并不清楚如何对其进行子类化并向其添加您自己的逻辑。请注意,我现在坚持这样一个事实,即当尝试超过 169 个进程时,celery 到处都会抛出错误,但这可能是 github 的问题。

标签: python celery


【解决方案1】:

通过子类化可能可以使用内置的 celeries autoscaling(感谢 Philip Tzou)。不幸的是,芹菜自动缩放功能的文档记录很差。

但是,在进行了更多挖掘之后,我遇到了celery.app.control,它(除其他外)允许通过 RabbitMQ 向工作线程发送消息来进行扩展。下面是一个小例子,说明如何做到这一点:

import os, time
from celery import Celery
from celery.app.control import Control

app = Celery()
controller = Control(app)

while True:
    n=5 # the numer of processes to add/remove
    upper_load_threshold = 6
    lower_load_threshold = 4
    if os.getloadavg()[0] <= lower_load_threshold: # we're looking at the 5 min load avg here
        controller.pool_grow(n)
    elif os.getloadavg()[0] >= upper_load_threshold:
        controller.pool_shrink(n)
    time.sleep(10)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-21
    • 2016-03-28
    • 2021-12-22
    • 2015-07-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多