【问题标题】:Why pipeline based on multithreading in python can improve efficiency为什么python中基于多线程的管道可以提高效率
【发布时间】:2021-11-08 01:47:08
【问题描述】:

虽然 Python 支持多线程执行,但 GIL 会导致其中一个线程一次向前推进。然而,在我阅读了《Effective Python》之后,我在这本书中实现了管道示例。代码如下,整个过程分为3个阶段,分别是下载、调整大小和上传。

from threading import Thread
from queue import Queue
import time

# define operation functions for 3 stages: download, resize and upload
def download(item):
    print("downloading {}".format(item))
    time.sleep(2)
    return item
    
def resize(item):
    print("resizing {}".format(item))
    time.sleep(3)
    return item
    
def upload(item):
    print("uploading {}".format(item))
    time.sleep(5)
    return item

# subclass of Thread
class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super(StoppableWorker, self).__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

# subclass of Queue
class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
    
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return
                yield item
            finally:
                self.task_done()

download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
output_queue = ClosableQueue()
threads = [StoppableWorker(download, download_queue, resize_queue),
           StoppableWorker(resize, resize_queue, upload_queue),
           StoppableWorker(upload, upload_queue, output_queue)]

for t in threads:
    t.start()

st = time.time()
for i in range(10):
    download_queue.put(i)
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print("It took {}".format(time.time() - st))
print(output_queue.qsize(), 'items finished')

根据输出时间,我发现N个任务花费的时间是10 + (N-1)*5秒(10 = 2(下载阶段)+ 3(调整大小阶段)+ 5(上传阶段))。好像这3个线程可以同时工作,这与上面的描述相矛盾的是,每次只有一个线程可以向前推进

【问题讨论】:

  • 是的,GIL 导致在任何一个时刻只执行一个线程,但 所有 线程可以同时 sleep()。这就是为什么线程对任何阻塞(如网络访问)都很有用的原因。
  • @quamrana 谢谢你的解释!

标签: python multithreading queue pipeline


【解决方案1】:
  • I/O 任务中的线程化:

线程正在改变游戏规则,因为许多与网络/数据 I/O 相关的脚本大部分时间都在等待来自远程源的数据。因为下载可能没有链接(即,抓取单独的网站),处理器可以从不同的数据源并行下载并在最后组合结果。对于 CPU 密集型进程,使用 threading 模块几乎没有什么好处。

【讨论】:

    猜你喜欢
    • 2011-06-20
    • 1970-01-01
    • 2022-10-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多