这些建议并不是真正的建议,否则事件循环会阻塞。因此,我们将失去事件编程的主要好处,对吗?
如果您在协程中调用阻塞(I/O 和 CPU 阻塞)函数而不等待执行程序,事件循环将阻塞。在这方面,是的,你不应该允许这种情况发生。
对于每种类型的阻塞代码,我会说它是一种执行器:对于 CPU 绑定的东西使用 ProcessPoolExecutor,对于 I/O 绑定的东西使用 ThreadPoolExecutor。
将 io/ 绑定任务作为单独的线程运行,需要以下假设: i/o 调用将释放 GIL,对吗?因为除此之外,操作系统将无法在事件循环和这个新的单独线程之间进行上下文切换。
当涉及到多线程时,Python 将在非常 short amount of time 之后在线程之间切换,而不会释放 GIL。但是,如果一个或多个线程有 I/O (or C-code),那么 GIL 将被释放,从而允许解释器将更多时间花在需要它的线程上。
底线是:
- 您可以在执行器中运行任何阻塞代码,它不会阻塞事件循环。您可以获得并发性,但可能会或可能不会获得性能。
- 例如,如果您在 ThreadPoolExecutor 中运行 CPU 密集型代码,由于 GIL,您不会从并发中获得性能优势。要获得 CPU 密集型的性能,您应该使用 ProcessPoolExecutor。
- 但是 I/O-bound 可以在 ThreadPoolExecutor 中运行,您可以获得性能。此处无需使用较重的 ProcessPoolExecutor。
我写了一个例子来演示它是如何工作的:
import sys
import asyncio
import time
import concurrent.futures
import requests
from contextlib import contextmanager
process_pool = concurrent.futures.ProcessPoolExecutor(2)
thread_pool = concurrent.futures.ThreadPoolExecutor(2)
def io_bound():
for i in range(3):
requests.get("https://httpbin.org/delay/0.4") # I/O blocking
print(f"I/O bound {i}")
sys.stdout.flush()
def cpu_bound():
for i in range(3):
sum(i * i for i in range(10 ** 7)) # CPU blocking
print(f"CPU bound {i}")
sys.stdout.flush()
async def run_as_is(func):
func()
async def run_in_process(func):
loop = asyncio.get_event_loop()
await loop.run_in_executor(process_pool, func)
async def run_in_thread(func):
loop = asyncio.get_event_loop()
await loop.run_in_executor(thread_pool, func)
@contextmanager
def print_time():
start = time.time()
yield
finished = time.time() - start
print(f"Finished in {round(finished, 1)}\n")
async def main():
print("Wrong due to blocking code in coroutine,")
print(
"you get neither performance, nor concurrency (which breaks async nature of the code)"
)
print("don't allow this to happen")
with print_time():
await asyncio.gather(run_as_is(cpu_bound), run_as_is(io_bound))
print("CPU bound works concurrently with threads,")
print("but you gain no performance due to GIL")
with print_time():
await asyncio.gather(run_in_thread(cpu_bound), run_in_thread(cpu_bound))
print("To get perfromance for CPU-bound,")
print("use process executor")
with print_time():
await asyncio.gather(run_in_process(cpu_bound), run_in_process(cpu_bound))
print("I/O bound will gain benefit from processes as well...")
with print_time():
await asyncio.gather(run_in_process(io_bound), run_in_process(io_bound))
print(
"... but there's no need in processes since you can use lighter threads for I/O"
)
with print_time():
await asyncio.gather(run_in_thread(io_bound), run_in_thread(io_bound))
print("Long story short,")
print("Use processes for CPU bound due to GIL")
print(
"and use threads for I/O bound since you benefit from concurrency regardless of GIL"
)
with print_time():
await asyncio.gather(run_in_thread(io_bound), run_in_process(cpu_bound))
if __name__ == "__main__":
asyncio.run(main())
输出:
Wrong due to blocking code in coroutine,
you get neither performance, nor concurrency (which breaks async nature of the code)
don't allow this to happen
CPU bound 0
CPU bound 1
CPU bound 2
I/O bound 0
I/O bound 1
I/O bound 2
Finished in 5.3
CPU bound works concurrently with threads,
but you gain no performance due to GIL
CPU bound 0
CPU bound 0
CPU bound 1
CPU bound 1
CPU bound 2
CPU bound 2
Finished in 4.6
To get perfromance for CPU-bound,
use process executor
CPU bound 0
CPU bound 0
CPU bound 1
CPU bound 1
CPU bound 2
CPU bound 2
Finished in 2.5
I/O bound will gain benefit from processes as well...
I/O bound 0
I/O bound 0
I/O bound 1
I/O bound 1
I/O bound 2
I/O bound 2
Finished in 3.3
... but there's no need in processes since you can use lighter threads for I/O
I/O bound 0
I/O bound 0
I/O bound 1
I/O bound 1
I/O bound 2
I/O bound 2
Finished in 3.1
Long story short,
Use processes for CPU bound due to GIL
and use threads for I/O bound since you benefit from concurrency regardless of GIL
CPU bound 0
I/O bound 0
CPU bound 1
I/O bound 1
CPU bound 2
I/O bound 2
Finished in 2.9