【发布时间】:2012-03-24 23:59:58
【问题描述】:
我对 python 还很陌生。 我正在使用多处理模块读取标准输入上的文本行,以某种方式转换它们并将它们写入数据库。这是我的代码的 sn-p:
batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
pool.apply_async(insert, args=(batch,i+1))
batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()
现在一切正常,直到我开始处理巨大的输入文件(数亿行),然后通过管道传输到我的 python 程序中。在某些时候,当我的数据库变慢时,我会看到内存已满。
玩了一会儿,发现 pool.apply_async 和 pool.map_async 从来没有阻塞过,所以要处理的调用队列越来越大。
解决我的问题的正确方法是什么?我希望我可以设置一个参数,一旦达到某个队列长度,它将阻止 pool.apply_async 调用。 Java 中的 AFAIR 可以为此目的为 ThreadPoolExecutor 提供一个具有固定长度的 BlockingQueue。
谢谢!
【问题讨论】:
-
“原来 pool.apply_async 和 pool.map_async 永远不会阻塞” - 我一直在寻找的一切
标签: python design-patterns queue multiprocessing python-multiprocessing