【问题标题】:How to configure celery for concurrent execution with multi-process?如何配置 celery 以实现多进程并发执行?
【发布时间】:2021-12-25 22:45:52
【问题描述】:

我有一个与外部 API 对话的任务,json 响应非常大,我必须多次进行此调用,然后进行进一步的 python 处理。为了减少耗时,我最初尝试过:

def make_call(*args, **kwargs):
    pass

def make_another(*args, **kwargs):
    pass

def get_calls():
    return make_call, make_another

def task(*args, **kwargs):
    procs = [Process(target=get_calls()[i], args=(,), 
    kwargs={}) for i in range(3)]
    _start = [proc.start() for proc in procs]
    _join = [proc.join() for proc in procs]

# 
transaction.on_commit(lambda: task.delay()) 

但是,我遇到了AssertionError: daemonic processes are not allowed to have child。通过额外的流程加快 celery 任务的最佳方法是什么?

【问题讨论】:

  • 为什么还要单独/子调用?这是一个异步任务,所以它是否需要一段时间并不重要
  • @lain 对这个问题有点不屑一顾,不是一个很好的论点,也不是对提问者有帮助。如果一个任务可以并行化,比如在多个 CPU 内核或多个 I/O 流上,你为什么要让它比必要的时间更长呢?它可能会对应用程序中的用户响应能力产生很大影响。比如说,一个用户等待 10 秒来处理一个长任务而不是 30 秒或更长时间。

标签: python django concurrency multiprocessing celery


【解决方案1】:

Celery worker 已经创建了许多进程。利用许多工作进程而不是创建子进程。您可以将工作委派给 celery 工人。这将导致更稳定/可靠的执行。

您既可以从您的客户端代码创建许多任务,也可以使用 celery 的原语,如 chainschords 来并行化工作。这些也可以与groups等其他原语组成。

例如,在您的场景中,您可能有两项任务:一项是进行 API 调用 make_api_call,另一项是解析响应 parse_response。您可以将它们链接在一起。

# chain another task when a task completes successfully
res = make_api_call.apply_async((0,), link=parse_response.s())

# chain syntax 1
result_1 = chain(make_api_call.s(1), parse_response.s())
# syntax 2 with | operator
result_b = make_api_call.s(2) | parse_response.s()

# can group chains
job = group([
   chain(make_api_call.s(i), parse_response.s()) 
   for i in range(3)
   ]
)
result = job.apply_async()

这只是一个通用示例。您可以创建任务并将它们组合成您的工作流程需要。请参阅:Canvas: Designing Work-flows 了解更多信息。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-04-14
    • 1970-01-01
    • 1970-01-01
    • 2016-02-07
    • 1970-01-01
    • 2018-06-11
    • 2019-02-06
    相关资源
    最近更新 更多