【问题标题】:Cannot grok python multiprocessing无法理解 python 多处理
【发布时间】:2012-02-28 08:15:36
【问题描述】:

我需要为我的数据库的每个元素运行一个函数。

当我尝试以下操作时:

from multiprocessing import Pool
from pymongo import Connection

def foo():
...


connection1 = Connection('127.0.0.1', 27017)
db1 = connection1.data

my_pool = Pool(6)
my_pool.map(foo, db1.index.find())

我收到以下错误:

作业 1,'python myscript.py' 被信号 SIGKILL 终止(强制退出)

我认为这是由db1.index.find() 在尝试返回数百万个数据库元素时吃掉所有可用内存造成的......

我应该如何修改我的代码以使其正常工作?

这里有一些日志:

dmesg | tail -500 | grep memory
[177886.768927] Out of memory: Kill process 3063 (python) score 683 or sacrifice child
[177891.001379]  [<ffffffff8110e51a>] out_of_memory+0xfa/0x250
[177891.021362] Out of memory: Kill process 3063 (python) score 684 or sacrifice child
[177891.025399]  [<ffffffff8110e51a>] out_of_memory+0xfa/0x250

下面的实际功能:

def create_barrel(item):
    connection = Connection('127.0.0.1', 27017)
    db = connection.data
    print db.index.count()
    barrel = []
    fls = []
    if 'name' in item.keys():
        barrel.append(WhitespaceTokenizer().tokenize(item['name']))
        name = item['name']
    elif 'name.utf-8' in item.keys():
        barrel.append(WhitespaceTokenizer().tokenize(item['name.utf-8']))
        name = item['name.utf-8']
    else:
        print item.keys()
    if 'files' in item.keys():
        for file in item['files']:
            if 'path' in file.keys():
                barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path'])))
                fls.append(("\\".join(file['path']),file['length']))
            elif 'path.utf-8'  in file.keys():
                barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path.utf-8'])))
                fls.append(("\\".join(file['path.utf-8']),file['length']))
            else:
                print file
                barrel.append(WhitespaceTokenizer().tokenize(file))
    if len(fls) < 1:
        fls.append((name,item['length']))
    barrel = sum(barrel,[])
    for s in barrel:
        vs = re.findall("\d[\d|\.]*\d", s) #versions i.e. numbes such as 4.2.7500 
    b0 = []
    for s in barrel:
        b0.append(re.split("[" + string.punctuation + "]", s))
    b1 = filter(lambda x: x not in string.punctuation, sum(b0,[]))
    flag = True
    while flag:
        bb = []
        flag = False
        for bt in b1:
            if bt[0] in string.punctuation:
                bb.append(bt[1:])
                flag = True
            elif bt[-1] in string.punctuation:
                bb.append(bt[:-1])
                flag = True
            else:
                bb.append(bt)
        b1 = bb
    b2 = b1 + barrel + vs
    b3 = list(set(b2))
    b4 = map(lambda x: x.lower(), b3)
    b_final = {}
    b_final['_id'] = item['_id']
    b_final['tags'] = b4
    b_final['name'] = name
    b_final['files'] = fls
    print db.barrels.insert(b_final)

我注意到有趣的事情。然后我按 ctrl+c 停止进程我得到以下信息:

python index2barrel.py 
Traceback (most recent call last):
  File "index2barrel.py", line 83, in <module>
    my_pool.map(create_barrel, db1.index.find, 6)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 280, in map_async
    iterable = list(iterable)
TypeError: 'instancemethod' object is not iterable

我的意思是,为什么多处理试图将某些东西转换为列表?不是问题的根源吗?

来自堆栈跟踪:

brk(0x231ccf000)                        = 0x231ccf000
futex(0x1abb150, FUTEX_WAKE_PRIVATE, 1) = 1
sendto(3, "+\0\0\0\260\263\355\356\0\0\0\0\325\7\0\0\0\0\0\0data.index\0\0"..., 43, 0, NULL, 0) = 43
recvfrom(3, "Some text from my database."..., 491663, 0, NULL, NULL) = 491663
... [manymany times]
brk(0x2320d5000)                        = 0x2320d5000
.... manymany times

上面的示例在 strace 输出中不断出现,出于某种原因 strace -o logfile python myscript.py 不停止。它只是吃掉所有可用的内存并写入日志文件。

更新。使用 imap 而不是 map 解决了我的问题。

【问题讨论】:

  • 这是Linux,是吗?系统日志中是否有任何内容显示 OOM 杀手终止了您的进程?
  • @cha0site 是的,这是 ubuntu。我将检查日志,但从 htop 我看到内存(所有 8 GB!)在几秒钟内就被吃掉了,所以我认为问题是 map(...) 试图将整个数据库放在列表中。不要如何避免这种情况。
  • 我想我们可能需要看看foo() 做了什么。 find() 返回一个Cursor,它不应该将所有记录都存储在内存中......
  • 一个想法:创建池的代码在顶层运行,而不是在函数内(或 name == 'main ') 条款?如果是这样,它也会被每个子进程执行,这是非常糟糕的。 :)
  • @Moonwalker 为了运行该功能,每个子进程可能需要导入您的模块,这意味着所有顶级代码将由每个进程执行。但显然,如果您的代码需要在 Windows 上运行,您只需要担心这一点:docs.python.org/library/multiprocessing.html#windows

标签: python mongodb mapreduce multiprocessing


【解决方案1】:

由于find() 操作将光标返回到 map 函数,并且您说这样做时它运行没有问题 for item in db1.index.find(): create_barrel(item) 看起来create_barrel 函数没问题。

您能否尝试限制游标中返回的结果数量,看看是否有帮助?我认为语法是:

db1.index.find().limit(100)

如果你可以试试这个,看看它是否有帮助,它可能有助于找出问题的原因。

EDIT1:我认为您使用 map 函数以错误的方式解决此问题 - 我认为您应该在 mongo python 驱动程序中使用 map_reduce - 这样 map 函数将由 mongod 进程执行。

【讨论】:

  • 嗯,这行得通。我需要在循环中与 skip(n*100) 或其他东西一起迭代它,但是伙计,这个解决方案感觉有点脏:|,我的意思是,我已经有了 map 函数,为什么要使用循环......
  • 我猜你可以尝试在 `def len__(self):` ` # __len 中添加 __len__ 已弃用(替换为 size())并将删除。 # # 弃用的原因有点复杂: # list(...) 调用 _PyObject_LengthHint 来猜测返回的列表需要多少空间。该方法依次调用 len。 # 因此,如果我们保留 len,在 Cursor 实例上调用 list(...) 至少需要两次往返数据库 - 这使得它的速度大约是 [x for x 的两倍in Cursor],这对用户来说并不明显。`
  • 我尝试按照您和 Lycha 的建议添加 'len',但失败了 Traceback (most recent call last): File "index2barrel.py", line 86, in &lt;module&gt; my_pool.map(create_barrel, cursor, chunksize=10) File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map return self.map_async(func, iterable, chunksize).get() File "/usr/lib/python2.7/multiprocessing/pool.py", line 286, in map_async if len(iterable) == 0: TypeError: object of type 'Cursor' has no len() 我尝试自己添加 len 函数,但没有成功。
  • mongo的map_reduce函数必须用js写,但是因为库的关系,我想用python。不过感谢您的建议,有限制的想法正在发挥作用。
【解决方案2】:

map() 函数将项目以块的形式提供给给定的函数。默认情况下,这个块大小是这样计算的(link to source):

chunksize, extra = divmod(len(iterable), len(self._pool) * 4)

在您的情况下,这可能会导致块大小过大,并让进程耗尽内存。尝试像这样手动设置块大小:

my_pool.map(foo, db1.index.find(), 100)

编辑:您还应该考虑重用数据库连接并在使用后关闭它们。现在您为每个项目创建新的数据库连接,并且您不会向它们调用close()

EDIT2:还要检查while 循环是否进入无限循环(将解释症状)。

EDIT3:根据您添加的回溯,map 函数尝试将光标转换为列表,从而导致一次获取所有项目。发生这种情况是因为它想找出集合中有多少项目。这是来自pool.pymap() 代码的一部分:

if not hasattr(iterable, '__len__'):
    iterable = list(iterable)

您可以尝试这样做以避免转换为列表:

cursor = db1.index.find()
cursor.__len__ = cursor.count()
my_pool.map(foo, cursor)

【讨论】:

  • 我已经尝试过了(即使块大小 = 6),也遇到了同样的问题:python 启动了额外的进程(达到指定的数量,在我的示例中为 6),这些进程正在吃 ram 直到没有公羊,一切都停止了。当我只运行这个函数时,for 循环内存消耗几乎不明显。
  • @Moonwalker 我又添加了一条关于使用数据库连接的注释。
  • 我什至从函数中删除了 db 连接以查看它是否有效,但没有运气:( 似乎该函数什么都不做,没有一个元素被写入数据库......跨度>
  • @Moonwalker 好的,在这种情况下,我会检查while 循环是否进入无限循环。你检查过吗?
  • 是的,我已经检查过了。此版本完美运行:对于 db1.index.find() 中的项目:create_barrel(item)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-12-29
  • 1970-01-01
  • 2020-09-06
  • 2012-02-06
相关资源
最近更新 更多