【问题标题】:Multiprocessing queue - Why does the memory consumption increase?多处理队列 - 为什么内存消耗会增加?
【发布时间】:2015-01-04 08:32:15
【问题描述】:

以下脚本生成 100 个大小为 100000 的随机字典,将每个 (key, value) 元组馈送到队列中,同时一个单独的进程从队列中读取:

import multiprocessing as mp

import numpy.random as nr


def get_random_dict(_dummy):
    return dict((k, v) for k, v in enumerate(nr.randint(pow(10, 9), pow(10, 10), pow(10, 5))))

def consumer(q):
    for (k, v) in iter(q.get, 'STOP'):
        pass

q = mp.Queue()
p = mp.Process(target=consumer, args=(q,))
p.start()
for d in mp.Pool(1).imap_unordered(get_random_dict, xrange(100)):
    for k, v in d.iteritems():
        q.put((k, v))
q.put('STOP')
p.join()

我希望内存使用量保持不变,因为消费者进程在主进程提供数据时从队列中提取数据。我验证了数据不会在队列中累积。

但是,我监控了内存消耗,并且随着脚本的运行它不断增加。如果我用for _ in xrange(100): d = get_random_dict() 替换imap_unordered,那么内存消耗是恒定的。有什么解释?

【问题讨论】:

标签: python python-2.7 queue multiprocessing python-multiprocessing


【解决方案1】:

Pool.imapimap 并不完全相同。相同之处在于它可以像imap 一样使用,并且它返回一个迭代器。但是,实现方式完全不同。后备池将尽其所能尽快完成分配给它的所有工作,无论迭代器的消耗速度有多快。如果您只想在请求时处理作业,那么使用multiprocessing 毫无意义。不妨使用itertools.imap 并完成它。

因此,您的内存消耗增加的原因是因为池创建字典的速度快于您的消费者进程消耗它们的速度。这是因为池从工作进程检索结果的方式是单向的(一个进程写入和进程读取),因此不需要显式同步机制。而Queue 是双向的——两个进程都可以读取和写入队列。这意味着需要在使用队列的进程之间进行显式同步,以确保它们不会竞争将下一个项目添加到队列或从队列中删除项目(从而使队列处于不一致状态)。

【讨论】:

【解决方案2】:

我认为主要问题是使用multiprocessing.Pool 收集在一个进程(Pool 进程)中创建的字典,然后将它们放入主进程的队列中。我认为(我可能错了)Pool 创建了自己的一些队列,而这些可能是数据积累的队列。

如果你像这样放置一些调试打印,你可以清楚地看到这一点:

...
def get_random_dict(_dummy):
    print 'generating dict'
    ...
...
for d in mp.Pool(1).imap_unordered(get_random_dict, xrange(100)):
    print 'next d'
    ...

然后你会看到这样的东西:

generating dict
generating dict
next d
generating dict
generating dict
generating dict
generating dict
generating dict
next d
...

这清楚地表明您在某处积累了生成的dicts (可能在Pool的内管中)。

我认为更好的解决方案是将数据从 get_random_dict 直接加入队列并放弃使用*map 函数 来自Pool

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-09-30
    • 2018-10-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多