【发布时间】:2021-11-08 01:47:08
【问题描述】:
虽然 Python 支持多线程执行,但 GIL 会导致其中一个线程一次向前推进。然而,在我阅读了《Effective Python》之后,我在这本书中实现了管道示例。代码如下,整个过程分为3个阶段,分别是下载、调整大小和上传。
from threading import Thread
from queue import Queue
import time
# define operation functions for 3 stages: download, resize and upload
def download(item):
print("downloading {}".format(item))
time.sleep(2)
return item
def resize(item):
print("resizing {}".format(item))
time.sleep(3)
return item
def upload(item):
print("uploading {}".format(item))
time.sleep(5)
return item
# subclass of Thread
class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue):
super(StoppableWorker, self).__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
# subclass of Queue
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return
yield item
finally:
self.task_done()
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
output_queue = ClosableQueue()
threads = [StoppableWorker(download, download_queue, resize_queue),
StoppableWorker(resize, resize_queue, upload_queue),
StoppableWorker(upload, upload_queue, output_queue)]
for t in threads:
t.start()
st = time.time()
for i in range(10):
download_queue.put(i)
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print("It took {}".format(time.time() - st))
print(output_queue.qsize(), 'items finished')
根据输出时间,我发现N个任务花费的时间是10 + (N-1)*5秒(10 = 2(下载阶段)+ 3(调整大小阶段)+ 5(上传阶段))。好像这3个线程可以同时工作,这与上面的描述相矛盾的是,每次只有一个线程可以向前推进
【问题讨论】:
-
是的,
GIL导致在任何一个时刻只执行一个线程,但 所有 线程可以同时sleep()。这就是为什么线程对任何阻塞(如网络访问)都很有用的原因。 -
@quamrana 谢谢你的解释!
标签: python multithreading queue pipeline