【问题标题】:How to create tasks with different concurrencies in Celery?如何在 Celery 中创建具有不同并发的任务?
【发布时间】:2019-09-27 03:54:34
【问题描述】:

以下任务有不同的要求。 slow 任务应该一个接一个地执行。 fast 任务可以做同样的事情(或更多),但应该独立于slow 任务。

我将它们绑定到不同的队列,但全局并发仍然适用于两者。如果它是1,那么一个任务要么从两个队列之一中拉出,要么如果它是>1,那么慢速任务可以有多个不应该执行的任务。

@app.task
def slow(x, y):
    # slow operation, only one at a time with concurrency of 1
    heavy_operation_takes_10_minutes()


@app.task
def fast(x, y):
    # can also be concurrency of 1 or more, as long as independent of `slow` task
    # ...
    return x + y

Celery 可以做到这一点吗?

【问题讨论】:

  • 您可以实现锁定,您可以将慢速任务作为链的一部分调用...您如何以及为什么调用这些任务?
  • 我也想过这个问题,但是锁定会从队列中拉出任务,同时可以被其他worker执行。
  • 在最坏的情况下,slow 任务函数被执行 N 次,其中 N-1 被锁定,fast 任务还在等着拿一个
  • 如果你有一个可靠的锁定机制,一次只允许一个任务获取锁,你可以“重试”没有获取锁的任务,这会将它放回队列中
  • 如果你必须避免把任务放到队列的后面——“重试”。那么等待锁定将是你最好的选择

标签: python rabbitmq celery amqp


【解决方案1】:

您可以做的是将任务放入不同的队列(据我了解,您已经这样做了),然后运行两个不同的 celery 工作程序以不同的并发性来监听这些队列。例如:

# non-concurrent worker for slow tasks
celery worker -A proj -Q queue-for-slow-tasks --concurrency=1 
# concurrent worker for fast tasks
celery worker -A proj -Q queue-for-fast-tasks

【讨论】:

  • 不幸的是,这将导致两个进程。看来我无法解决这个问题,甚至应该考虑os.fork()
  • 是的,这是两个独立的进程。为什么会出现问题?
  • 我想使用fast 消息发送信息,告知应该取消已经运行的slow 任务
  • 你打算怎么做?使用 revoke - docs.celeryproject.org/en/latest/reference/… ?可以在不同的进程中运行工作人员,它们通过消息代理进行通信,并且撤销应该可以正常工作。
猜你喜欢
  • 2022-10-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-05-23
  • 2014-12-02
  • 2021-09-27
  • 2019-04-20
  • 2023-03-25
相关资源
最近更新 更多