【问题标题】:Task priority in celery with redis带有redis的芹菜中的任务优先级
【发布时间】:2013-03-06 05:47:25
【问题描述】:

我想用 celery 实现一个分布式作业执行系统。鉴于 rabbitMQ 不支持优先级,而我非常需要这个功能,我转向了 celery+redis。

在我的情况下,任务与硬件密切相关,例如,任务 A 只能在 Worker 1 上运行,因为只有 Worker 1 的 PC 拥有必要的硬件。我将每个工作人员的 CONCURRENCY 设置为 1,以便工作人员每次只运行一个任务。每个任务大约需要 2 分钟。

为了实现优先级功能,首先我尝试在调用apply_async()时添加priority参数,例如apply_async(priority=0)apply_async(priority=9)。在这个测试中,我只启动了一个 COCURRENCY=1 的 Worker,并以不同的优先级一个一个地启动了 10 个任务。我希望看到apply_async(priority=0) 启动的任务将优先运行,但不幸的是它们只是作为启动顺序启动。

然后我尝试做一些工作。我克隆了每个任务,所以对于每个任务我都有 task_high 和 task_low,由 @celery.task(priority=0)@celery.task(priority=1) 装饰。然后我做了和上面一样的测试,这次更好,当启动顺序是“HH-LLLL-HHHH”时,真正的顺序是“HH-L-H-H-L-H-L-L-H”。我想redis在这里做了一些调度和平衡工作。

但这仍然不能满足我的期望。我希望得到类似“HHHHHH-LLLL”这样的订单,因为对于某些任务,我只有一台合适的机器和必要的硬件,并希望高优先级的任务尽快运行。

我在 Internet 上搜索了其他工作,例如使用两个队列,一个用于高优先级任务,另一个用于低优先级任务,前者使用 2 台机器,后者使用 1 台机器。但是由于我的硬件非常有限,这对我不起作用。

能否给点建议?

【问题讨论】:

    标签: python celery distributed


    【解决方案1】:

    Celery Redis 传输确实尊重优先级字段, 但 Redis 本身并没有优先级的概念。

    优先级支持是通过为每个队列创建 n 个列表来实现的 并在 BRPOP 命令中使用该顺序。 我在这里说n,因为即使有 10 (0-9) 个优先级,这些都是 默认合并为 4 个级别以节省资源。 这意味着名为celery 的队列实际上将被拆分为 4 个队列:

    ['celery0', 'celery3`, `celery6`, `celery9`]
    

    如果您想要更多优先级,您可以设置priority_steps 传输选项:

    BROKER_TRANSPORT_OPTIONS = {
        'priority_steps': list(range(10)),
    }
    

    也就是说,请注意,这永远不会像实施的优先事项那样好 在服务器级别,最多可能是近似值。但它可能仍然很好 足以满足您的应用需求。

    Source.

    【讨论】:

    • 感谢您提供详细信息,但我该怎么做才能得到我想要的订单?我应该只更改priority_steps 吗?
    • 您只需尝试一下即可。使用默认优先级步骤,那些 0 和 1 将仅合并为 0。
    • 好的,谢谢,我尝试将优先级 0 用于高任务,将优先级 9 用于低任务,现在看起来好多了,尽管有时某些优先级为 9 的任务仍然在优先级为 0 的任务之前运行。我不知道是我的配置有问题还是redis做了一些平衡工作。
    • 正如我所说,只要不在服务器上实现优先级,它并不完全可靠
    • 这不是正确的答案...并且您应该提到您已经复制+粘贴了文档...即使您复制了错误的部分 (docs.celeryproject.org/en/latest/userguide/… )
    【解决方案2】:

    关于redis消息优先级的Celery文档在这里redis-message-priorities,您可以自定义优先级。以10为例:

    1. 设置 priority_steps 传输选项
    app.conf.broker_transport_options = {
        'priority_steps': list(range(10)),
        'queue_order_strategy': 'priority',
    }
    
    1. 以正常方式启动 celery worker
    celery -A tasks worker --loglevel=info
    
    1. 调用任务, 0 为最高优先级,9 为最低优先级
    custom_priority=5 
    task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'},priority=custom_priority)
    

    【讨论】:

      猜你喜欢
      • 2020-01-10
      • 1970-01-01
      • 2012-06-13
      • 1970-01-01
      • 1970-01-01
      • 2016-03-22
      • 2020-04-07
      • 1970-01-01
      • 2011-05-26
      相关资源
      最近更新 更多