【问题标题】:Python Multiprocessing concurrency using Manager, Pool and a shared list not working使用管理器、池和共享列表的 Python 多处理并发不起作用
【发布时间】:2015-10-08 10:05:43
【问题描述】:

我正在学习 python 多处理,并且我正在尝试使用此功能来填充一个列表,其中包含 os 中存在的所有文件。但是,我写的代码只是顺序执行的。

#!/usr/bin/python
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/"
manager = multiprocessing.Manager()
files = manager.list()


def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))

mp = [multiprocessing.Process(target=get_files, args=(tld[x],))
      for x in range(len(tld))]

for i in mp:
    i.start()
    i.join()
print len(files)

当我检查进程树时,我只能看到一个智利进程产生。 (man pstree 说 {} 表示父进程产生的子进程。)

---bash(10949)---python(12729)-+-python(12730)---{python}(12752)
                               `-python(12750)`

我一直在寻找的是,为每个 tld 目录生成一个进程,填充共享列表 files,这将是大约 10-15 个进程,具体取决于目录的数量。我做错了什么?

编辑::

我使用multiprocessing.Pool 创建工作线程,这次是 进程已生成,但在我尝试使用multiprocessing.Pool.map() 时出错。我指的是python文档中显示的以下代码

from multiprocessing import Pool
def f(x):
return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3])) 

按照这个例子,我将代码重写为

import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
pool = multiprocessing.Pool(processes=len(tld))
print pool
files = manager.list()
def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)

它正在分叉多个进程。

---bash(10949)---python(12890)-+-python(12967)
                               |-python(12968)
                               |-python(12970)
                               |-python(12971)
                               |-python(12972)
                               ---snip---

但是代码出错了

Process PoolWorker-2: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get return recv() AttributeError: 'module' object has no attribute 'get_files' self._target(*self._args, **self._kwargs) self.run() task = get() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get AttributeError: 'module' object has no attribute 'get_files' self.run()

我在这里做错了什么,为什么 get_files() 函数会出错?

【问题讨论】:

    标签: python concurrency parallel-processing multiprocessing python-multiprocessing


    【解决方案1】:

    这仅仅是因为您在定义函数 get_files 之前实例化了您的池:

    import os
    import multiprocessing
    
    tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
    manager = multiprocessing.Manager()
    
    files = manager.list()
    def get_files(x):
        for root, dir, file in os.walk(x):
            for name in file:
                files.append(os.path.join(root, name))
    
    pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here
    
    pool.map(get_files, [x for x in tld])
    pool.close()
    pool.join()
    print len(files)
    

    进程的总体思路是,在您启动它的那一刻,您会分叉主进程的内存。所以任何定义主进程之后分叉不会在子进程中。

    如果你想要一个共享内存,你可以使用threading 库,但是你会遇到一些问题(参见:The global interpreter lock

    【讨论】:

    • 谢谢你,成功了。但是,我想知道,既然 tld 已经定义,为什么要在函数重要之前定义池?定义池时没有引用函数。
    • 您的pool.map 中有一个:)。通过使用pool.map,您要求您的进程使用函数get_files
    • 同意。但是 pool.map 是在函数之后定义的,尽管 pool 是之前定义的,因为在定义 pool 时,它只是要产生的工作进程的数量,根据 python doc。 ` pool = Pool(processes=4) # 启动 4 个工作进程` 请纠正我我在哪里误解。再次感谢@FunkySayu
    • 我不确定你的句子的所有意思(我的英语需要提高:))。据我了解,您认为 Pool 只设置 number 个进程,而不是 start 个进程。那不是真的(感谢上帝:D)。总体思路是启动池以便之后使用它(例如,使用pool.map)。初始化过程相当长(相信我,我为此做了很多工作),而且应该只运行一次。
    • 这很有意义。无论你缺乏什么英语,你都可以用 Python 弥补 :) 太棒了。感谢所有的帮助。谢谢@FunkySayu :)
    猜你喜欢
    • 1970-01-01
    • 2015-05-28
    • 2021-12-27
    • 1970-01-01
    • 2021-05-07
    • 2013-06-14
    • 2020-06-25
    • 2011-01-06
    • 1970-01-01
    相关资源
    最近更新 更多