【发布时间】:2012-02-28 08:15:36
【问题描述】:
我需要为我的数据库的每个元素运行一个函数。
当我尝试以下操作时:
from multiprocessing import Pool
from pymongo import Connection
def foo():
...
connection1 = Connection('127.0.0.1', 27017)
db1 = connection1.data
my_pool = Pool(6)
my_pool.map(foo, db1.index.find())
我收到以下错误:
作业 1,'python myscript.py' 被信号 SIGKILL 终止(强制退出)
我认为这是由db1.index.find() 在尝试返回数百万个数据库元素时吃掉所有可用内存造成的......
我应该如何修改我的代码以使其正常工作?
这里有一些日志:
dmesg | tail -500 | grep memory
[177886.768927] Out of memory: Kill process 3063 (python) score 683 or sacrifice child
[177891.001379] [<ffffffff8110e51a>] out_of_memory+0xfa/0x250
[177891.021362] Out of memory: Kill process 3063 (python) score 684 or sacrifice child
[177891.025399] [<ffffffff8110e51a>] out_of_memory+0xfa/0x250
下面的实际功能:
def create_barrel(item):
connection = Connection('127.0.0.1', 27017)
db = connection.data
print db.index.count()
barrel = []
fls = []
if 'name' in item.keys():
barrel.append(WhitespaceTokenizer().tokenize(item['name']))
name = item['name']
elif 'name.utf-8' in item.keys():
barrel.append(WhitespaceTokenizer().tokenize(item['name.utf-8']))
name = item['name.utf-8']
else:
print item.keys()
if 'files' in item.keys():
for file in item['files']:
if 'path' in file.keys():
barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path'])))
fls.append(("\\".join(file['path']),file['length']))
elif 'path.utf-8' in file.keys():
barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path.utf-8'])))
fls.append(("\\".join(file['path.utf-8']),file['length']))
else:
print file
barrel.append(WhitespaceTokenizer().tokenize(file))
if len(fls) < 1:
fls.append((name,item['length']))
barrel = sum(barrel,[])
for s in barrel:
vs = re.findall("\d[\d|\.]*\d", s) #versions i.e. numbes such as 4.2.7500
b0 = []
for s in barrel:
b0.append(re.split("[" + string.punctuation + "]", s))
b1 = filter(lambda x: x not in string.punctuation, sum(b0,[]))
flag = True
while flag:
bb = []
flag = False
for bt in b1:
if bt[0] in string.punctuation:
bb.append(bt[1:])
flag = True
elif bt[-1] in string.punctuation:
bb.append(bt[:-1])
flag = True
else:
bb.append(bt)
b1 = bb
b2 = b1 + barrel + vs
b3 = list(set(b2))
b4 = map(lambda x: x.lower(), b3)
b_final = {}
b_final['_id'] = item['_id']
b_final['tags'] = b4
b_final['name'] = name
b_final['files'] = fls
print db.barrels.insert(b_final)
我注意到有趣的事情。然后我按 ctrl+c 停止进程我得到以下信息:
python index2barrel.py
Traceback (most recent call last):
File "index2barrel.py", line 83, in <module>
my_pool.map(create_barrel, db1.index.find, 6)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 280, in map_async
iterable = list(iterable)
TypeError: 'instancemethod' object is not iterable
我的意思是,为什么多处理试图将某些东西转换为列表?不是问题的根源吗?
来自堆栈跟踪:
brk(0x231ccf000) = 0x231ccf000
futex(0x1abb150, FUTEX_WAKE_PRIVATE, 1) = 1
sendto(3, "+\0\0\0\260\263\355\356\0\0\0\0\325\7\0\0\0\0\0\0data.index\0\0"..., 43, 0, NULL, 0) = 43
recvfrom(3, "Some text from my database."..., 491663, 0, NULL, NULL) = 491663
... [manymany times]
brk(0x2320d5000) = 0x2320d5000
.... manymany times
上面的示例在 strace 输出中不断出现,出于某种原因 strace -o logfile python myscript.py 不停止。它只是吃掉所有可用的内存并写入日志文件。
更新。使用 imap 而不是 map 解决了我的问题。
【问题讨论】:
-
这是Linux,是吗?系统日志中是否有任何内容显示 OOM 杀手终止了您的进程?
-
@cha0site 是的,这是 ubuntu。我将检查日志,但从 htop 我看到内存(所有 8 GB!)在几秒钟内就被吃掉了,所以我认为问题是 map(...) 试图将整个数据库放在列表中。不要如何避免这种情况。
-
我想我们可能需要看看
foo()做了什么。find()返回一个Cursor,它不应该将所有记录都存储在内存中...... -
一个想法:创建池的代码在顶层运行,而不是在函数内(或 name == 'main ') 条款?如果是这样,它也会被每个子进程执行,这是非常糟糕的。 :)
-
@Moonwalker 为了运行该功能,每个子进程可能需要导入您的模块,这意味着所有顶级代码将由每个进程执行。但显然,如果您的代码需要在 Windows 上运行,您只需要担心这一点:docs.python.org/library/multiprocessing.html#windows
标签: python mongodb mapreduce multiprocessing