如果你从异步代码中调用ThreadPoolExecutor,你应该使用asynciorun_in_executor函数,否则会阻塞主事件循环。
如果额外的工作负载受 CPU 限制,那么您还应该改用 ProcessPoolExecutor。
Python 文档中的示例:
import asyncio
import concurrent.futures
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound)
print('custom process pool', result)
asyncio.run(main())
对于max_workers,默认值通常就可以了:
这取决于您的工作负载(CPU 与 I/O 限制),但对于 CPU 限制任务,没有必要将其设置为大于可用 CPU 的数字,因为它实际上可能会由于上下文切换等而降低性能。
两个执行器都使用队列在可用线程/进程上排队和调度任务。
更新:2021 年 3 月 25 日星期四 15:17:51 UTC
asyncio 事件循环是单线程的,因此当您同时安排其他协程时,您会看到该问题。如您所见,none-blocking 任务被blocking executor 阻止了 10 秒:
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START none-blocking: (scheduled: 1.0s)
START blocking executor: (scheduled: 10.0s)
END none-blocking executor: (elapsed: 5.0s)
END blocking executor: (elapsed: 10.0s)
END none-blocking: (elapsed: 10.0s)
如果你运行了几次,blocking executor 将首先启动,none-blocking 任务甚至不会在 blocking executor 结束之前启动:
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START blocking executor: (scheduled: 10.0s)
END none-blocking executor: (elapsed: 5.0s)
END blocking executor: (elapsed: 10.0s)
START none-blocking: (scheduled: 1.0s)
END none-blocking: (elapsed: 1.0s)
当您注释掉blocking executor 时,您可以看到所有调用现在都是异步的:
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START none-blocking: (scheduled: 1.0s)
END none-blocking: (elapsed: 1.0s)
END none-blocking executor: (elapsed: 5.0s)
关键的一点是,一旦您开始编写异步代码,就不能将其与同步调用混为一谈。
test.py:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking(msg, t):
t1 = time.perf_counter()
print(f"START {msg}: (scheduled: {t}s)")
time.sleep(t)
print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")
async def task1(msg, t):
t1 = time.perf_counter()
print(f"START {msg}: (scheduled: {t}s)")
await asyncio.sleep(t)
print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")
async def task2(msg, t):
with ThreadPoolExecutor() as executor:
future = executor.submit(blocking, msg, t)
future.result()
async def main():
loop = asyncio.get_running_loop()
aws = [
task1("none-blocking", 1.0),
loop.run_in_executor(None, blocking, "none-blocking executor", 5.0),
task2("blocking executor", 10.0),
]
for coro in asyncio.as_completed(aws):
await coro
if __name__ == "__main__":
asyncio.run(main())