【问题标题】:How to limit the workers' throughput in imap_unrdered?如何限制 imap_unordered 中工作人员的吞吐量?
【发布时间】:2015-01-30 12:21:47
【问题描述】:
我正在使用来自multiprocessing 库的imap_unordered 来并行化一些数据处理计算。问题在于,有时读取返回的迭代器的主进程处理计算结果的速度比工作人员生成它们的速度慢(网络/磁盘速度限制等)。这会导致程序消耗所有可用内存并崩溃。
我希望内部迭代器有一些内部大小限制,因此当返回的迭代器处理速度太慢时,内部队列会变满并阻塞生产者(异步工作者)。但显然情况并非如此。
实现这种行为的最简单方法是什么?
【问题讨论】:
标签:
python
multiprocessing
throughput
python-multiprocessing
【解决方案1】:
您可能需要考虑使用Queue:
import multiprocessing # Don't use queue.Queue!
MAX_QUEUE_SIZE = 20
q = multiprocessing.Queue(MAX_QUEUE_SIZE) # Inserts will block if the queue is full
然后,在您的主进程中:
while 1:
do_something_with(q.get())
在您的子进程中:
while 1:
q.put(create_something())
您将不得不重写一些机制(即您不能再使用imap_unordered),但是使用Pool 的低级方法应该是相当简单的。