【问题标题】:Multiprocessing of shared list共享列表的多处理
【发布时间】:2014-05-14 00:13:20
【问题描述】:

我写了一个这样的程序:

from multiprocessing import Process, Manager

def worker(i):
    x[i].append(i)

if __name__ == '__main__':
    manager = Manager()
    x = manager.list()
    for i in range(5):
        x.append([])
    p = []
    for i in range(5):
        p.append(Process(target=worker, args=(i,)))
        p[i].start()

    for i in range(5):
        p[i].join()

    print x

我想在进程之间创建一个共享列表,每个进程修改其中的一个列表。但是这个程序的结果是一个空列表的列表:[[],[],[],[],[]]。

怎么了?

【问题讨论】:

  • 调试的时候可以在def work(i)里面设置断点吗?那可能不叫
  • @linpingta 我在worker(i)中添加了一条打印语句,消息打印出来了。
  • 您确定要在worker 中附加到x[i] 而不是x(全局)吗?
  • @beroe 是的,这只是我遇到的问题的一个例子。

标签: python multiprocessing


【解决方案1】:

我认为这是因为管理器的实施方式存在怪癖。

如果您创建两个 Manager.list 对象,然后将其中一个列表附加到另一个,则您附加的列表类型会在父列表中发生变化:

>>> type(l)
<class 'multiprocessing.managers.ListProxy'>
>>> type(z)
<class 'multiprocessing.managers.ListProxy'>
>>> l.append(z)
>>> type(l[0])
<class 'list'>   # Not a ListProxy anymore

l[0]z 不是同一个对象,因此不会像您期望的那样表现:

>>> l[0].append("hi")
>>> print(z)
[]
>>> z.append("hi again")
>>> print(l[0])
['hi again']

如您所见,更改嵌套列表不会对 ListProxy 对象产生任何影响,但更改 ListProxy 对象确实会更改嵌套列表。文档居然explicitly notes this

注意

对 dict 和 list 代理中的可变值或项目的修改将 不能通过管理器传播,因为代理没有办法 知道它的值或项目何时被修改。要修改这样的项目, 您可以将修改后的对象重新分配给容器代理:

翻看源码可以看到,当你在一个ListProxy上调用append时,append调用实际上是通过IPC发送给一个manager对象,然后manager调用append在共享列表上。这意味着append 的参数需要腌制/解封。在 unpickling 过程中,ListProxy 对象变成了一个常规的 Python 列表,它是 ListProxy 指向的内容(也就是它的所指对象)的副本。这也是noted in the documentation

代理对象的一个​​重要特性是它们是可腌制的,因此 它们可以在进程之间传递。但是请注意,如果代理 被发送到相应管理器的进程,然后将其解压 产生所指本身。这意味着,例如,一个共享对象可以包含第二个

那么,回到上面的例子,如果 l[0] 是z 的副本,为什么更新z 也会更新l[0]?因为副本也注册到 Proxy 对象,所以当您更改 ListProxy(上例中的z)时,它也会更新列表的所有注册副本(上例中的l[0])。但是,副本对代理一无所知,因此当您更改副本时,代理不会更改。

因此,为了使您的示例正常工作,您需要在每次要修改子列表时创建一个新的 manager.list() 对象,并且只直接更新该代理对象,而不是通过父级的索引来更新它列表:

#!/usr/bin/python

from multiprocessing import Process, Manager

def worker(x, i, *args):
    sub_l = manager.list(x[i])
    sub_l.append(i)
    x[i] = sub_l


if __name__ == '__main__':
    manager = Manager()
    x = manager.list([[]]*5)
    print x
    p = []
    for i in range(5):
        p.append(Process(target=worker, args=(x, i)))
        p[i].start()

    for i in range(5):
        p[i].join()

    print x

这是输出:

dan@dantop2:~$ ./multi_weirdness.py 
[[0], [1], [2], [3], [4]]

【讨论】:

  • 感谢您的回答!我打算通过父列表更改子列表,但似乎我不能。我最初使用列表,但它减慢了我的并行运行速度。有没有其他使用共享列表的方法可以避免这个问题?
  • 请参阅我答案末尾的示例。它展示了如何通过从子列表创建新的 manager.list、更新新的代理对象,然后将代理重新插入父列表来解决此问题。
  • 我遇到了类似的问题,但解决方法不是一个选项,因为创建太多代理会花费很多时间。还有其他解决方法吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-05-28
  • 2019-12-11
  • 1970-01-01
  • 1970-01-01
  • 2017-01-02
  • 2019-05-10
  • 2016-05-14
相关资源
最近更新 更多