【问题标题】:Handle multiple results in Python multiprocessing在 Python 多处理中处理多个结果
【发布时间】:2018-08-14 13:30:07
【问题描述】:

我正在编写一段 Python 代码来使用多处理功能解析大量 ascii 文件。 对于每个文件,我必须执行此函数的操作

def parse_file(file_name):
    record = False
    path_include = []
    buffer_include = []
    include_file_filters = {}
    include_keylines = {}
    grids_lines = []
    mat_name_lines = []
    pids_name_lines = []
    pids_shell_lines= []
    pids_weld_lines = []
    shells_lines = []
    welds_lines = []
    with open(file_name, 'rb') as in_file:
        for lineID, line in enumerate(in_file):
            if record:
                path_include += line
            if record and re.search(r'[\'|\"]$', line.strip()):
                buffer_include.append(re_path_include.search(
                    path_include).group(1).replace('\n', ''))
                record = False
            if 'INCLUDE' in line and '$' not in line:
                if re_path_include.search(line):
                    buffer_include.append(
                        re_path_include.search(line).group(1))
                else:
                    path_include = line
                    record = True
            if line.startswith('GRID'):
                grids_lines += [lineID]
            if line.startswith('$HMNAME MAT'):
                mat_name_lines += [lineID]
            if line.startswith('$HMNAME PROP'):
                pids_name_lines += [lineID]
            if line.startswith('PSHELL'):
                pids_shell_lines += [lineID]
            if line.startswith('PWELD'):
                pids_weld_lines += [lineID]
            if line.startswith(('CTRIA3', 'CQUAD4')):
                shells_lines += [lineID]
            if line.startswith('CWELD'):
                welds_lines += [lineID]
    include_keylines = {'grid': grids_lines, 'mat_name': mat_name_lines, 'pid_name': pids_name_lines, \
                        'pid_shell': pids_shell_lines, 'pid_weld': pids_weld_lines, 'shell': shells_lines, 'weld': welds_lines}
    include_file_filters = {file_name: include_keylines}
    return buffer_include, include_file_filters 

此函数用于循环遍历文件列表,以这种方式(CPU上的每个进程解析一个完整的文件)

import multiprocessing as mp
p = mp.Pool(mp.cpu_count())
buffer_include = []
include_file_filters = {}
for include in grouper([list_of_file_path]):
    current = mp.current_process()
    print 'Running: ', current.name, current._identity
    results = p.map(parse_file, include) 
    buffer_include += results[0]
    include_file_filters.update(results[1])
p.close()

上面用到的grouper函数定义为

def grouper(iterable, padvalue=None):
    return itertools.izip_longest(*[iter(iterable)]*mp.cpu_count(), fillvalue=padvalue)

我在 4 核 CPU(英特尔酷睿 i3-6006U)中使用 Python 2.7.15。

当我运行我的代码时,我看到所有 CPU 都在 100% 使用,Python 控制台中的输出为 Running: MainProcess (),但没有出现其他任何情况。似乎我的代码在指令results = p.map(parse_file, include) 处被阻止并且无法继续(当我一次解析一个文件而不进行并行化时,该代码运行良好)。

  • 出了什么问题?
  • 如何处理parse_file函数给出的结果 在并行执行期间?我的方法是否正确?

提前感谢您的支持

编辑

感谢 darc 的回复。我试过你的建议,但问题是一样的。如果我像这样将代码放在 if 语句下,问题似乎得到了解决

if __name__ == '__main__':

这可能与 Python IDLE 处理进程的方式有关。出于开发和调试的原因,我使用 IDLE 环境。

【问题讨论】:

    标签: python multiprocessing


    【解决方案1】:

    根据pythondocs:

    map(func, iterable[, chunksize]) map() 内置函数的并行等效项(尽管它仅支持一个可迭代参数)。它会一直阻塞,直到结果准备好。

    此方法将可迭代对象分割成多个块,将它们作为单独的任务提交给进程池。这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。

    因为它阻塞了您的进程,请等待解析文件完成。

    由于 map 已经对可迭代对象进行了分块,您可以尝试将所有包含作为一个大的可迭代对象一起发送。

    import multiprocessing as mp
    p = mp.Pool(mp.cpu_count())
    buffer_include = []
    include_file_filters = {}
    results = p.map(parse_file, list_of_file_path, 1) 
    buffer_include += results[0]
    include_file_filters.update(results[1])
    p.close()
    

    如果你想保持原来的循环使用 apply_async,或者如果你使用 python3 你可以使用 ProcessPoolExecutor submit() 函数并读取结果。

    【讨论】:

      猜你喜欢
      • 2019-04-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-09-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多