【发布时间】:2011-03-16 09:02:56
【问题描述】:
我正在使用以下模式进行多处理:
for item in data:
inQ.put(item)
for i in xrange(nProcesses):
inQ.put('STOP')
multiprocessing.Process(target=worker, args=(inQ, outQ)).start()
inQ.join()
outQ.put('STOP')
for result in iter(outQ.get, 'STOP'):
# save result
效果很好。但是如果我通过outQ 发送一个numpy 数组,'STOP' 不会在outQ 的末尾结束,导致我的结果获取循环提前终止。
这是一些重现该行为的代码。
import multiprocessing
import numpy as np
def worker(inQ, outQ):
for i in iter(inQ.get, 'STOP'):
result = np.random.rand(1,100)
outQ.put(result)
inQ.task_done()
inQ.task_done() # for the 'STOP'
def main():
nProcesses = 8
data = range(1000)
inQ = multiprocessing.JoinableQueue()
outQ = multiprocessing.Queue()
for item in data:
inQ.put(item)
for i in xrange(nProcesses):
inQ.put('STOP')
multiprocessing.Process(target=worker, args=(inQ, outQ)).start()
inQ.join()
print outQ.qsize()
outQ.put('STOP')
cnt = 0
for result in iter(outQ.get, 'STOP'):
cnt += 1
print "got %d items" % cnt
print outQ.qsize()
if __name__ == '__main__':
main()
如果您将result = np.random.rand(1,100) 替换为result = i*i 之类的内容,则代码将按预期工作。
这里发生了什么?我在这里做一些根本错误的事情吗?我原以为inQ.join() 之后的outQ.put() 会做我想做的事,因为join() 会阻塞,直到所有进程都完成所有put()s。
对我有用的解决方法是使用while outQ.qsize() > 0 进行结果获取循环,这可以找到。但我读到qsize() 不可靠。只有在不同的进程运行时才不可靠吗?在完成inQ.join() 之后,我依靠qsize() 是否安全?
我希望有些人建议使用 multiprocessing.Pool.map(),但是在使用 numpy 数组 (ndarrays) 时,我遇到了 pickle 错误。
感谢观看!
【问题讨论】:
-
您是否对其进行了测试以查看普通的旧 threading.Thread 是否存在相同的问题?
-
我首先使用线程,但我用它来读取 tar.gz,这是在 python 中实现的,这意味着由于 GIL,线程将无济于事。使用线程和 Queue.Queue 似乎可以工作,是的。
标签: python numpy queue multiprocessing task-queue