【问题标题】:python multiprocessing worker memory consumption increases indefinitelypython多处理工作者内存消耗无限增加
【发布时间】:2017-06-02 15:42:37
【问题描述】:

我有一个需要搜索特定值的 Excel 2010 文件 (xlsx) 列表。由于 xslx 是二进制格式,这不能用普通的文本编辑器完成。所以我对每个文件执行以下操作

  1. 获取文件名
  2. 在熊猫中打开
  3. 将数据帧转换为 numpy 数组
  4. 检查数组的值

这需要多处理,因为它不受 I/O 限制。熊猫的东西和数组转换需要时间。所以我设置了我的脚本的多处理版本(见下文):

问题是每个工作进程的内存消耗。尽管每个 xlsx 文件只有 100kb,但它在每个工作人员中不断累积达到 2GB 的峰值。我不明白为什么在处理新文件之前没有释放内存。这样我在处理我的文件列表之前就耗尽了内存。

问题似乎不是队列,而是熊猫的东西。

这是我的代码。可以使用系统上的任何 xlsx 文件对其进行测试。

import pandas as pd
import multiprocessing as mp
import glob

path = r'c:\temp'
fileFilter = 'serial.xlsx'
searchString = '804.486'


def searchFile(tasks, results, searchString):
    """Iterates over files in tasks and searches in file for the
    occurence of 'searchString'.

    Args:
    -----
    tasks: queue of strings
        Files to look in
    results: queue of strings
        Files where the searchString was found
    searchString: str
        the string to be searched
    """
    # for files in the queue
    for task in iter(tasks.get, 'STOP'):
        # read the filestructre into memory
        xfile = pd.ExcelFile(task)
        # iterate all sheets
        for sheet in xfile.sheet_names[:3]:
            # read the sheet
            data = pd.read_excel(xfile, sheet)
            # check if searchString is in numpy representation of dataframe
            if searchString in data.values.astype(str):
                # put filename in results queue
                results.put(task)
                break
        xfile.close()

if __name__ == "__main__":
    # get all files matching the filter that are in the root path
    print('gathering files')
    files = glob.glob(path + '\**\{}'.format(fileFilter), recursive=True)

    # setup of queues and variables
    n_proc = 2
    tasks = mp.Queue()
    results = mp.Queue()

    print('Start processing')
    # setup processes and start them
    procs = [mp.Process(target=searchFile,
                        args=(tasks, results, searchString))
             for x in range(n_proc)]
    for p in procs:
        p.daemon = True
        p.start()

    # populate queue
    for file in files:
        tasks.put(file)

    for proc in procs:
        tasks.put('STOP')

    for p in procs:
        p.join()

    # print results
    for result in range(results.qsize()):
        print(results.get())

    print('Done')

【问题讨论】:

  • 附带说明 - .xlsx 文件是 zip 压缩的档案,里面有 xml。如果您唯一需要的是搜索一个值,您可以解压缩并搜索流,而无需任何额外的模块。
  • 这不是唯一要做的事情......我可以用 python 内部 zip-module 解压缩它们吗?
  • 试图解压缩 xlsx 文件,它会生成许多 xml 文档:对于样式,对于每个工作表,主题,......找到带有值的 xml 文档是相当困难的。
  • 是的,这有点乱——但这种混乱是有秩序的。如果您主要搜索或提取值 - 与 pandas 相比,使用 SAX / DOM 解析器可能更容易(并且内存占用更少)。否则 - 看看其他可以使它更容易的库。

标签: python multiprocessing out-of-memory


【解决方案1】:

在 gc 中似乎有问题,它无法在你永远不会离开的函数上下文中收集 pandas 帧。您可以使用multiprocessing.Pool.map,它可以为您处理队列。将为每个项目调用 Worker 函数并让 gc 完成工作。您也可以使用maxtasksperchild Pool 构造函数参数来限制工人处理的项目数量。

import glob
import multiprocessing


def searchFile(task, searchString):
    xfile = pd.ExcelFile(task)
    ...
    if found:
        return task


if __name__ == '__main__':
    files = glob.glob(path + '\**\{}'.format(fileFilter), recursive=True)
    searchString = '804.486'

    pool = multiprocessing.Pool(2, maxtasksperchild=10)

    args = ((fname, searchString) for fname in files)
    matchedFiles = filter(None, pool.map(searchFile, args))
    pool.close()

【讨论】:

  • 试过这个...没有解决问题,因为问题不是代码,而是文件之一。检查了一组不同的文件,它可以正常工作。还是谢谢。
猜你喜欢
  • 2015-01-04
  • 1970-01-01
  • 2014-11-06
  • 1970-01-01
  • 2021-03-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多