【问题标题】:Python, invoke a process pool without blocking the event loopPython,调用进程池而不阻塞事件循环
【发布时间】:2019-04-17 16:25:52
【问题描述】:

如果我运行以下代码:

import asyncio
import time
import concurrent.futures

def cpu_bound(mul):
    for i in range(mul*10**8):
        i+=1
    print('result = ', i)
    return i

async def say_after(delay, what):
    print('sleeping async...')
    await asyncio.sleep(delay)
    print(what)

# The run_in_pool function must not block the event loop
async def run_in_pool():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        result = executor.map(cpu_bound, [1, 1, 1])

async def main():
    task1 = asyncio.create_task(say_after(0.1, 'hello'))
    task2 = asyncio.create_task(run_in_pool())
    task3 = asyncio.create_task(say_after(0.1, 'world'))

    print(f"started at {time.strftime('%X')}")
    await task1
    await task2
    await task3
    print(f"finished at {time.strftime('%X')}")

if __name__ == '__main__':
    asyncio.run(main())

输出是:

started at 18:19:28
sleeping async...
result =  100000000
result =  100000000
result =  100000000
sleeping async...
hello
world
finished at 18:19:34

这表明事件循环会阻塞,直到 cpu 绑定作业 (task2) 完成,然后它会继续使用 task3

如果我只运行一个 cpu 绑定作业(run_in_pool 是以下一个):

async def run_in_pool():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        result = await loop.run_in_executor(executor, cpu_bound, 1)

然后似乎事件循环没有阻塞,因为输出是:

started at 18:16:23
sleeping async...
sleeping async...
hello
world
result =  100000000
finished at 18:16:28

如何在进程池中运行多个 cpu 绑定作业(task2)而不阻塞事件循环?

【问题讨论】:

  • 实际上这个主题的正确问题是:如何模拟 executor.map() 方法,使其可以等待,这样它就不会阻塞事件循环。

标签: python python-multiprocessing python-asyncio coroutine event-loop


【解决方案1】:

正如您所发现的,您需要使用 asyncio 自己的 run_in_executor 来等待提交的任务完成而不阻塞事件循环。 Asyncio 不提供 map 的等价物,但模仿它并不难:

async def run_in_pool():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [loop.run_in_executor(executor, cpu_bound, i)
                   for i in (1, 1, 1)]
        result = await asyncio.gather(*futures)

【讨论】:

  • 谢谢,这正是我想要的。本质上,您需要使用 run_in_pool 协程中的 await 语句将控制权交还给事件循环。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-08-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-02-18
相关资源
最近更新 更多