【问题标题】:Multiprocessing -- Thread Pool Memory Leak?多处理——线程池内存泄漏?
【发布时间】:2018-12-18 18:07:38
【问题描述】:

我正在观察我无法向自己解释的内存使用情况。下面我提供了我的实际代码的精简版本,它仍然表现出这种行为。该代码旨在完成以下任务:

以 1000 行为单位读取文本文件。每一行都是一个句子。将这 1000 个句子分成 4 个生成器。将这些生成器传递到线程池,并在 250 个句子上并行运行特征提取。 在我的实际代码中,我从整个文件的所有句子中积累了特征和标签。 现在奇怪的事情来了:即使没有累积这些值,内存也会被分配但不会再次释放!我认为它与线程池有关。总共占用的内存量取决于为任何给定单词提取了多少特征。我在这里用range(100) 模拟这个。看看:

from sys import argv
from itertools import chain, islice
from multiprocessing import Pool
from math import ceil


# dummyfied feature extraction function
# the lengt of the range determines howmuch mamory is used up in total,
# eventhough the objects are never stored
def features_from_sentence(sentence):
    return [{'some feature'  'some value'} for i in range(100)], ['some label' for i in range(100)]


# split iterable into generator of generators of length `size`
def chunks(iterable, size=10):
    iterator = iter(iterable)
    for first in iterator:
        yield chain([first], islice(iterator, size - 1))


def features_from_sentence_meta(l):
    return list(map (features_from_sentence, l))


def make_X_and_Y_sets(sentences, i):
    print(f'start: {i}')
    pool = Pool()
    # split sentences into a generator of 4 generators
    sentence_chunks = chunks(sentences, ceil(50000/4))
    # results is a list containing the lists of pairs of X and Y of all chunks
    results = map(lambda x : x[0], pool.map(features_from_sentence_meta, sentence_chunks))
    X, Y = zip(*results)
    print(f'end: {i}')
    return X, Y


# reads file in chunks of `lines_per_chunk` lines
def line_chunks(textfile, lines_per_chunk=1000):
    chunk = []
    i = 0
    with open(textfile, 'r') as textfile:
        for line in textfile:
            if not line.split(): continue
            i+=1
            chunk.append(line.strip())
            if i == lines_per_chunk:
                yield chunk
                i = 0
                chunk = []
        yield chunk

textfile = argv[1]

for i, line_chunk in enumerate(line_chunks(textfile)):
    # stop processing file after 10 chunks to demonstrate
    # that memory stays occupied (check your system monitor)
    if i == 10:
        while True:
            pass
    X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)

我用来调试的文件有 50000 行非空行,这就是我在一个地方使用硬编码的 50000 的原因。如果你想使用同一个文件,他是一个链接,方便你:

https://www.dropbox.com/s/v7nxb7vrrjim349/de_wiki_50000_lines?dl=0

现在,当您运行此脚本并打开系统监视器时,您会观察到内存已用完,并且使用量一直持续到第 10 个块,我人为地进入一个无限循环以证明内存仍在使用中,即使虽然我从不存储任何东西。

你能解释一下为什么会这样吗?我似乎遗漏了一些关于应该如何使用多处理池的内容。

【问题讨论】:

  • 一旦你使用了一些内存,Python 很少将它返回给操作系统。这通常不是问题——内存没有泄漏,它只是在三个级别的空闲列表之一中,所以当你再次需要内存时,它不必允许,所以它运行得更快。当它一个问题时,答案很简单:你已经在使用multiprocessing,所以你可以经常回收池进程。
  • 您的意思是我目前正在为每 1000 行块创建新池,并且在处理块后它们不会被销毁?如何为文件中的所有 1000 行块重用相同的 4 个线程?
  • 好的,首先,您根本不是在创建线程,而是在创建进程。但是第二,不,在最初的 4 之后,您没有创建任何新流程。您正在一遍又一遍地重用相同的流程,就像您想要的那样。这正是内存没有被释放的原因:内存通常在进程退出之前不会被释放。但是,同样,这首先可能不是问题。
  • 我的意思是流程,是的。但这里是个问题。那么我怎样才能按照您的建议回收(销毁并制造新的?)流程?
  • 是什么让您认为这里有问题?您要解决的实际症状是什么?不是您测量的最无意义的值,而是您试图通过测量它们来诊断的症状?

标签: python memory-management memory-leaks multiprocessing threadpool


【解决方案1】:

首先,让我们澄清一些误解——尽管事实证明,这实际上并不是一开始探索的正确途径。

当您在 Python 中分配内存时,它当然必须从操作系统中获取该内存。

但是,当您释放内存时,它很少会返回到操作系统,直到您最终退出。相反,它进入了一个“空闲列表”——或者实际上,用于不同目的的多个级别的空闲列表。这意味着下次您需要内存时,Python 已经拥有它,并且可以立即找到它,而无需与操作系统交谈以分配更多。这通常会使内存密集型程序更快。

但这也意味着——尤其是在现代 64 位操作系统上——试图通过查看活动监视器/任务管理器/等来了解您是否真的有任何内存压力问题。几乎没用。


标准库中的tracemalloc 模块提供了低级工具来查看您的内存使用情况。在更高的级别上,您可以使用memory_profiler 之类的东西,它(如果您启用tracemalloc 支持——这很重要)可以将该信息与来自psutil 等来源的操作系统级别信息放在一起,以找出事情在哪里去。

但是,如果您没有发现任何实际问题 — 您的系统不会进入交换地狱,您不会收到任何 MemoryError 异常,您的性能不会达到线性扩展的奇怪悬崖到 N ,然后突然在 N+1 处下地狱,等等——你通常不需要首先为这些而烦恼。


如果您确实发现了一个问题,那么幸运的是,您已经解决了一半。正如我在顶部提到的,您分配的大多数内存在您最终退出之前不会返回给操作系统。但是,如果您所有的内存使用都发生在子进程中,并且这些子进程没有状态,您可以让它们随时退出并重新启动。

这样做当然会产生性能成本 — 进程拆卸和启动时间、必须重新开始的页面映射和缓存,以及要求操作系统重新分配内存等等。而且还有一个复杂性成本——你不能只运行一个池并让它做它的事情;你必须参与它的事情,让它为你回收流程。

multiprocessing.Pool 类中没有内置支持。

当然,您可以构建自己的Pool。如果您想花哨,可以查看multiprocessing 的源代码并执行它的功能。或者,您可以从Process 对象列表和一对Queues 中构建一个普通池。或者您可以直接使用 Process 对象,而无需抽象池。


您可能会遇到内存问题的另一个原因是您的各个进程都很好,但是您的进程太多了。

事实上,这里似乎就是这种情况。

您在此函数中创建了一个由 4 个工作人员组成的 Pool

def make_X_and_Y_sets(sentences, i):
    print(f'start: {i}')
    pool = Pool()
    # ...

…你为每个块调用这个函数:

for i, line_chunk in enumerate(line_chunks(textfile)):
    # ...
    X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)

因此,您最终会为每个块添加 4 个新进程。即使每个内存使用率都非常低,但同时拥有数百个内存也会增加。

更不用说数百个进程在 4 个内核上竞争可能会严重影响您的时间性能,因此您将时间浪费在上下文切换和操作系统调度上,而不是做真正的工作。

正如您在评论中指出的那样,解决此问题的方法很简单:只需为每个调用创建一个全局 pool 而不是一个新的。


很抱歉这里所有的 Columbo,但是……还有一件事……这段代码在你的模块的顶层运行:

for i, line_chunk in enumerate(line_chunks(textfile)):
    # ...
    X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)

……这就是试图启动池和所有子任务的代码。但是该池中的每个子进程都需要import 这个模块,这意味着它们最终都将运行相同的代码,并启动另一个池和一整套额外的子任务。

你大概是在 Linux 或 macOS 上运行这个,其中默认的startmethodfork,这意味着multiprocessing 可以避免这个import,所以你没有问题。但是对于其他 startmethods,这段代码基本上是一个会吃掉你所有系统资源的分叉炸弹。其中包括spawn,这是 Windows 上的默认启动方法。因此,如果任何人有可能在 Windows 上运行此代码,您应该将所有顶级代码放在 if __name__ == '__main__': 保护中。

【讨论】:

  • 嗯,我想我只是通过将pool 设为全局变量来解决我的问题。内存使用量现在保持不变。
  • @lotolmencre 啊,刚刚注意到了。您实际上正在为每个块创建单独的池!糟糕……如果您理解为什么会出现问题,您应该编写自己的答案并接受它。 (如果它有帮助,你仍然可以投票。)或者,如果它看起来像是一个明显的拼写错误,对其他人没有帮助,我们甚至可以将问题作为拼写错误关闭。
  • 在我看来这不是一个类型,因为我不知道有必要继续使用同一个池对象。但现在我愿意。谢谢。
猜你喜欢
  • 2017-12-07
  • 2015-05-20
  • 2021-09-29
  • 2012-05-12
  • 2014-12-30
  • 2018-06-19
  • 1970-01-01
  • 1970-01-01
  • 2014-10-29
相关资源
最近更新 更多