【问题标题】:Python multiprocessing.Queue vs multiprocessing.manager().Queue()Python multiprocessing.Queue 与 multiprocessing.manager().Queue()
【发布时间】:2017-09-12 08:18:17
【问题描述】:

我有这样一个简单的任务:

def worker(queue):
    while True:
        try:
            _ = queue.get_nowait()
        except Queue.Empty:
            break

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    # queue = multiprocessing.Queue()
    queue = manager.Queue()

    for i in range(5):
        queue.put(i)

    processes = []

    for i in range(2):
        proc = multiprocessing.Process(target=worker, args=(queue,))
        processes.append(proc)
        proc.start()

    for proc in processes:
        proc.join()

似乎 multiprocessing.Queue 可以完成我需要的所有工作,但另一方面,我看到许多 manager().Queue() 的示例并且无法理解我真正需要什么。看起来 Manager().Queue() 使用某种代理对象,但我不明白这些目的,因为 multiprocessing.Queue() 在没有任何代理对象的情况下做同样的工作。

所以,我的问题是:

1) multiprocessing.Queue 和 multiprocessing.manager().Queue() 返回的对象有什么区别?

2) 我需要使用什么?

【问题讨论】:

标签: python queue multiprocessing python-multiprocessing


【解决方案1】:

我最近遇到了Manager().Queue() 的一个问题,当SyncManager 对象(由multiprocessing.Manager() 返回)似乎死了,并且它管理的队列永远阻塞(即使使用*_nowait())。

我不确定原因,或者如果 SyncManager 真的死了,我唯一的线索是我从一个具有 __del__() 的类实例调用 multiprocessing.Manager(),它记录了调用它的进程,我可以看到这是从 SyncManager 进程调用的__del__()

这意味着我的对象在 SyncManager 进程中有一个副本,并且它被垃圾回收了。这可能意味着只有我的对象被删除,并且 SyncManager 很好,但我确实看到相应的队列变得无响应与 SyncManager 进程中的 __del__() 调用相关。

我不知道我的对象如何在 SyncManager 进程中结束。我通常会抽出 50-200 名经理——有些人的生命周期重叠,有些则没有——直到我看到这个问题。对于解释器退出时存在的对象,不会调用__del__(),并且我通常不会看到来自__del__() 的此日志导致SyncManager 对象死亡,只是偶尔。可能当出现问题时,SyncManager 对象首先处理其对象,然后解释器才会退出,这就是为什么我有时会看到__del__() 调用。

我确实看到我的队列变得无响应,即使在我没有看到从 SyncManager 调用 __del__() 的情况下也是如此。

我还看到 SyncManager “死”而没有引起进一步的问题。

“无响应”是指:

queue.get(timeout=1)
queue.put(timeout=1)

永不回头。

queue.get_nowait(timeout=1)
queue.put_nowait(timeout=1)

永不回头。

这变得有点复杂,然后我本来想要,但我把细节透露出来,以防万一它对某人有帮助。

我之前使用Manager().Queue()很长时间没有任何问题。我怀疑要么是实例化了很多管理器对象导致了问题,要么实例化了很多管理器导致了一个一直存在的问题。

我使用Python 3.6.5

【讨论】:

【解决方案2】:

虽然我对这个主题的理解有限,但从我所做的事情来看,multiprocessing.Queue() 和 multiprocessing.Manager().Queue() 之间存在一个主要区别:

  • multiprocessing.Queue() 是一个对象,而 multiprocessing.Manager().Queue() 是一个地址(代理),指向由 multiprocessing.Manager() 对象管理的共享队列。
  • 因此您不能将普通的 multiprocessing.Queue() 对象传递给 Pool 方法,因为它不能被腌制。
  • 此外,python doc 告诉我们在使用 multiprocessing.Queue() 时要特别注意,因为它可能会产生不良影响

注意 当一个对象被放入队列时,该对象被腌制并且后台线程稍后将腌制数据刷新到底层管道。这会产生一些令人惊讶的后果,但不会造成任何实际困难——如果它们真的困扰您,那么您可以使用由经理创建的队列。 将对象放入空队列后,队列的 empty() 方法返回 False 和 get_nowait() 可以在不引发 Queue.Empty 的情况下返回之前可能会有一个无限小的延迟。 如果多个进程正在对对象进行排队,则对象可能会在另一端乱序接收。但是,由同一进程排队的对象将始终按预期顺序排列。

警告 如上所述,如果子进程已将项目放入队列(并且它没有使用 JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲的项目都被刷新到管道。 这意味着如果您尝试加入该进程,您可能会遇到死锁,除非您确定已放入队列的所有项目都已被消耗。类似地,如果子进程是非守护进程,则父进程在尝试加入其所有非守护子进程时可能会挂起退出。 请注意,使用管理器创建的队列不存在此问题。

通过将队列设置为全局变量并在初始化时为所有进程设置它,可以将 multiprocessing.Queue() 与 Pool 一起使用:

queue = multiprocessing.Queue()
def initialize_shared(q):
    global queue
    queue=q

pool= Pool(nb_process,initializer=initialize_shared, initargs(queue,))

将创建具有正确共享队列的池进程,但我们可以争辩说 multiprocessing.Queue() 对象不是为此用途而创建的。

另一方面,manager.Queue() 可以在池子进程之间共享,方法是将它作为函数的普通参数传递。

在我看来,使用 multiprocessing.Manager().Queue() 在任何情况下都很好,而且麻烦更少。使用经理可能会有一些缺点,但我不知道。

【讨论】:

  • 据我所知,托管项目(队列、值等)的缺点是它们速度较慢。常规的多进程项目是共享的,因此访问速度很快,但需要用锁保护。据我所知,multiprocess.manager 等效项是在其自己的进程中处理的实际项目的代理。因此,在管理器保护竞争条件的同时,这也意味着大量的进程间调用,这是昂贵的。因此,如果与共享项目有大量通信,非托管可能会更快(尽管更危险)。
  • 通过提到的解决方法,“队列”将被初始化为每个进程的新队列对象。要为所有进程使用相同的队列,请使用multiprocessing.Manager().Queue()global queue = 仅适用于当前进程全局
  • @Avraham 从我有限的实践经验来看,情况恰恰相反——当我开发多进程视频处理软件时,非托管队列速度慢到令人望而却步。使用托管队列解决了所有问题。相关:stackoverflow.com/questions/47085458
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-22
  • 1970-01-01
  • 2019-05-24
  • 1970-01-01
  • 2012-12-22
  • 2021-08-29
相关资源
最近更新 更多