【问题标题】:Deadlock with big object in multiprocessing.Queuemultiprocessing.Queue 中的大对象死锁
【发布时间】:2020-03-30 10:40:49
【问题描述】:

当你向multiprocessing.Queue 提供一个足够大的对象时,程序似乎挂在奇怪的地方。考虑这个最小的例子:

import multiprocessing

def dump_dict(queue, size):
  queue.put({x: x for x in range(size)})
  print("Dump finished")

if __name__ == '__main__':
  SIZE = int(1e5)
  queue = multiprocessing.Queue()
  process = multiprocessing.Process(target=dump_dict, args=(queue, SIZE))
  print("Starting...")
  process.start()
  print("Joining...")
  process.join()
  print("Done")
  print(len(queue.get()))

如果SIZE 参数足够小(至少在我的情况下SIZE 足够大,程序就会挂在奇怪的地方.现在,在搜索解释时,即python multiprocessing - process hangs on join for large queue,我总是看到“你需要从队列中消费”的一般答案。但看起来很奇怪的是程序实际上打印了Dump finished,即在将对象放入queue 之后到达代码行。此外,使用 Queue.put_nowait 而不是 Queue.put 并没有什么不同。

最后,如果您使用Process.join(1) 而不是Process.join(),整个过程将完成队列中有完整的字典(即print(len(..)) 行将打印10000)。

谁能给我更多的见解?

【问题讨论】:

  • 代码在队列中插入一个包含 10000 个元素的字典。这是你想要的还是你试图插入每个 dict 项目?
  • @Pynchia 是的,正如我所说的,我知道我在那里放了一个巨大的物体(这是“虚拟”示例)。对我来说似乎很奇怪的是代码的行为(即我希望冻结Queue.put,如果我在那里设置超时,肯定不会“成功”。我希望得到某种解释(或链接以供进一步阅读。 ..)

标签: python multiprocessing queue python-multiprocessing


【解决方案1】:

您需要在 process.join() 之前在父级中 queue.get() 以防止死锁。队列已经产生了一个带有第一个queue.put() 的馈线线程,并且您的工作进程中的MainThread 在退出之前加入了这个馈线线程。因此,在结果完全刷新到(OS-pipe-)缓冲区之前,工作进程不会退出,但是您的结果太大而无法放入缓冲区,并且您的父进程在工作人员退出之前不会从队列中读取,导致死锁。

您会看到print("Dump finished") 的输出,因为实际发送发生在馈线线程中,queue.put() 本身只是作为中间步骤附加到工作进程中的collections.deque

【讨论】:

  • 谢谢!我没有意识到这个错误,现在有意义
  • 很好看,当然,没有其他人在消费,只是生产......赞成
猜你喜欢
  • 2014-02-16
  • 2011-03-08
  • 1970-01-01
  • 1970-01-01
  • 2012-02-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多