【问题标题】:Combination of numpy and multiprocessing Queues disturbs ordering of the queuenumpy 和多处理队列的组合扰乱了队列的顺序
【发布时间】:2011-03-16 09:02:56
【问题描述】:

我正在使用以下模式进行多处理:

    for item in data:
        inQ.put(item)

    for i in xrange(nProcesses):
        inQ.put('STOP')
        multiprocessing.Process(target=worker, args=(inQ, outQ)).start()

    inQ.join()
    outQ.put('STOP')

    for result in iter(outQ.get, 'STOP'):
        # save result

效果很好。但是如果我通过outQ 发送一个numpy 数组,'STOP' 不会在outQ 的末尾结束,导致我的结果获取循环提前终止。

这是一些重现该行为的代码。

import multiprocessing
import numpy as np

def worker(inQ, outQ):
    for i in iter(inQ.get, 'STOP'):
        result = np.random.rand(1,100)
        outQ.put(result)
        inQ.task_done()
    inQ.task_done() # for the 'STOP'

def main():
    nProcesses = 8
    data = range(1000)

    inQ = multiprocessing.JoinableQueue()
    outQ = multiprocessing.Queue()
    for item in data:
        inQ.put(item)

    for i in xrange(nProcesses):
        inQ.put('STOP')
        multiprocessing.Process(target=worker, args=(inQ, outQ)).start()

    inQ.join()
    print outQ.qsize()
    outQ.put('STOP')

    cnt = 0
    for result in iter(outQ.get, 'STOP'):
        cnt += 1
    print "got %d items" % cnt
    print outQ.qsize()

if __name__ == '__main__':
    main()

如果您将result = np.random.rand(1,100) 替换为result = i*i 之类的内容,则代码将按预期工作。

这里发生了什么?我在这里做一些根本错误的事情吗?我原以为inQ.join() 之后的outQ.put() 会做我想做的事,因为join() 会阻塞,直到所有进程都完成所有put()s。

对我有用的解决方法是使用while outQ.qsize() > 0 进行结果获取循环,这可以找到。但我读到qsize() 不可靠。只有在不同的进程运行时才不可靠吗?在完成inQ.join() 之后,我依靠qsize() 是否安全?

我希望有些人建议使用 multiprocessing.Pool.map(),但是在使用 numpy 数组 (ndarrays) 时,我遇到了 pickle 错误。

感谢观看!

【问题讨论】:

  • 您是否对其进行了测试以查看普通的旧 threading.Thread 是否存在相同的问题?
  • 我首先使用线程,但我用它来读取 tar.gz,这是在 python 中实现的,这意味着由于 GIL,线程将无济于事。使用线程和 Queue.Queue 似乎可以工作,是的。

标签: python numpy queue multiprocessing task-queue


【解决方案1】:

numpy 数组使用丰富的比较。所以 a=='STOP' 返回一个 numpy 数组,而不是 bool,并且该 numpy 数组不能被强制为 bool。在幕后,iter(outQ.get, 'STOP') 只是在进行比较,并且可能在尝试将结果转换为布尔值时将异常视为 False。您必须手动执行 while 循环,从队列中拉取项目,检查 isinstance(item, basestring),然后再将其与“STOP”进行比较。

while True:
    item = outQ.get()
    if isinstance(item, basestring) and item == 'STOP':
        break
    cnt += 1

检查 qsize() 可能也可以正常工作,因为在加入输入队列后没有其他进程添加到队列中。

【讨论】:

  • 它能为您解决问题吗?
  • 不,似乎还有其他事情发生。序列化结果项所花费的时间可能会干扰远程进程中的队列将它们推入主进程的能力。因此,在所有结果实际进入之前,您将“STOP”添加到主进程的队列中。尝试 outQ.put([i]*1000) ,您将看到相同的效果。尝试“while not outQ.empty():”而不是使用 'STOP' 哨兵。避免使用 .qSize(),因为它不适用于所有平台。
  • 你是对的 [i]*1000。我以为序列化会完成,当程序执行inQ.task_done()时,为什么不是这样?我试过empty(),但也失败了。有时qsize() > 0 是真的。我不明白幕后发生了什么。
【解决方案2】:

由于您知道从 outQ 获得多少项目,另一种解决方法是明确等待该数量的项目:

import multiprocessing as mp
import numpy as np
import Queue

N=100

def worker(inQ, outQ):
    while True:
        i,item=inQ.get()
        result = np.random.rand(1,N)
        outQ.put((i,result))
        inQ.task_done()

def main():
    nProcesses = 8
    data = range(N)
    inQ = mp.JoinableQueue()
    outQ = mp.Queue()    

    for i,item in enumerate(data):
        inQ.put((i,item))

    for i in xrange(nProcesses):
        proc=mp.Process(target=worker, args=[inQ, outQ])
        proc.daemon=True
        proc.start()

    inQ.join()
    cnt=0
    for _ in range(N):
        result=outQ.get()
        print(result)
        cnt+=1
        print(cnt)      
    print('got {c} items'.format(c=cnt))

if __name__ == '__main__':
    main()

【讨论】:

  • 这并没有真正回答问题,而是解决了问题。哦,天哪,我早该发现的!非常感谢!
  • @unutbu:你真的使用过那个模块吗?在我看来真的很乱。
  • @chuck:我还没有必要将它用于严肃的工作,但上面的代码有效,是吗?你觉得这个模块很乱怎么办?
  • 乍一看似乎有很多注释掉的代码行。它似乎需要 Cython,我没有看到任何记录,比如我在使用它时需要注意的内容。此外,自述文件似乎已损坏。您是否有过它可以正常工作的经验?
  • @unutbu:对于 1000x10000 矩阵来说它失败了,只有 80 MB,我想至少达到 10GB。我猜共享内存部分不够大?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-04-18
  • 1970-01-01
  • 2010-10-29
  • 1970-01-01
相关资源
最近更新 更多