【问题标题】:Multiprocessing a for loop?多处理一个for循环?
【发布时间】:2013-12-10 00:53:35
【问题描述】:

我有一个数组(称为data_inputs),其中包含数百个天文图像文件的名称。然后对这些图像进行处理。我的代码有效,并且需要几秒钟来处理每个图像。但是,它一次只能做一张图片,因为我正在通过for 循环运行数组:

for name in data_inputs:
    sci=fits.open(name+'.fits')
    #image is manipulated

没有理由我必须先修改图像,所以是否可以利用我机器上的所有 4 个内核,每个内核在不同的图像上通过 for 循环运行?

我已阅读有关 multiprocessing 模块的信息,但我不确定如何在我的情况下实现它。 我很想让multiprocessing 工作,因为最终我必须在 10,000 多张图像上运行它。

【问题讨论】:

    标签: python multiprocessing


    【解决方案1】:

    你可以简单地使用multiprocessing.Pool:

    from multiprocessing import Pool
    
    def process_image(name):
        sci=fits.open('{}.fits'.format(name))
        <process>
    
    if __name__ == '__main__':
        pool = Pool()                         # Create a multiprocessing Pool
        pool.map(process_image, data_inputs)  # process data_inputs iterable with pool
    

    【讨论】:

    • 最好使用:pool = Pool(os.cpu_count()) 这是使用多处理的更通用方式。
    • 注意:os.cpu_count() 是在 Python 3.4 中添加的。对于 Python 2.x,请使用 multiprocessing.cpu_count()
    • Pool()Pool(os.cpu_count()) 相同
    • 详细说明@Tim 的评论-Pool() 在没有processes 的值的情况下调用与Pool(processes=cpu_count()) 相同,无论您使用的是Python 3 还是Python 2 - 所以最佳实践中的任何一个版本是使用Pool()docs.python.org/2/library/multiprocessing.html
    • @LiorMagen ,如果我没记错的话,使用 Pool(os.cpu_count()) 将使操作系统冻结,直到处理结束,因为您不会给操作系统留下任何空闲内核。对于很多用户来说 Pool(os.cpu_count() - 1) 可能是更好的选择
    【解决方案2】:

    你可以使用multiprocessing.Pool:

    from multiprocessing import Pool
    class Engine(object):
        def __init__(self, parameters):
            self.parameters = parameters
        def __call__(self, filename):
            sci = fits.open(filename + '.fits')
            manipulated = manipulate_image(sci, self.parameters)
            return manipulated
    
    try:
        pool = Pool(8) # on 8 processors
        engine = Engine(my_parameters)
        data_outputs = pool.map(engine, data_inputs)
    finally: # To make sure processes are closed in the end, even if errors happen
        pool.close()
        pool.join()
    

    【讨论】:

    • 我无法理解这里的“data_inputs”是什么。你还没有定义它。我应该给它什么价值?
    • 它实际上源于 alko 的回答,我引用了他的评论(参见代码块):“proces data_inputs iterable with pool”。所以data_inputs 是一个可迭代的(就像在标准的map 中一样)。
    • python doc 仅表明可以将函数传递给pool.map(func, iterable[, chunksize])。当传递一个对象时,这个对象会被所有进程共享吗?因此,我可以让所有进程写入对象中的同一列表self.list_ 吗?
    【解决方案3】:

    或者

    with Pool() as pool: 
        pool.map(fits.open, [name + '.fits' for name in datainput])
    

    【讨论】:

    • TypeError: 'Pool' object is not callable
    • 对不起,我的错误是“pool.map”,而不仅仅是“pool”。我修好了。
    【解决方案4】:

    如果您只使用 for 循环来迭代可迭代对象,我建议使用 imap_unorderedchunksize。计算后,它将立即从每个循环返回结果。 map 等待计算所有结果,因此处于阻塞状态。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-06-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-12
      • 2018-08-06
      • 2022-01-03
      • 2020-09-25
      相关资源
      最近更新 更多