【问题标题】:Python multiprocessing pool.map with multiples arguments具有多个参数的 Python 多处理 pool.map
【发布时间】:2018-04-03 08:17:17
【问题描述】:

我需要一些帮助,因为我尝试了两天,但我不知道该怎么做。我有函数 compute_desc 接受多个参数(准确地说是 5 个),我想并行运行它。 我现在有这个:

def compute_desc(coord, radius, coords, feat, verbose):
    # Compute here my descriptors
    return my_desc # numpy array (1x10 dimensions)

def main():
    points = np.rand.random((1000000, 4))
    coords = points[:, 0:3]
    feat = points[:, 3]
    all_features = np.empty((1000000, 10))
    all_features[:] = np.NAN
    scales = [0.5, 1, 2]
    for radius in scales:
        for index, coord in enumerate(coords):
            all_features[index, :] = compute_desc(coord,
                                                  radius,
                                                  coords,
                                                  feat,
                                                  False)

我想并行化这个。我看到了几个使用 Pool 的解决方案,但我不明白它是如何工作的。

我尝试使用pool.map(),但我只能向函数发送一个参数。

这是我的解决方案(它不起作用):

all_features = [pool.map(compute_desc, zip(point, repeat([radius, 
                                                          coords,
                                                          feat, 
                                                          False]
                                                         ) 
                                           ) 
                         )]

但我怀疑它是否可以与 numpy 数组一起使用。

编辑

这是我的最小代码池(现在可以使用):

import numpy as np
from multiprocessing import Pool
from itertools import repeat

def compute_desc(coord, radius, coords, feat, verbose):
    # Compute here my descriptors
    my_desc = np.rand.random((1, 10))
    return my_desc

def compute_desc_pool(args):
    coord, radius, coords, feat, verbose = args
    compute_desc(coord, radius, coords, feat, verbose)

def main():
    points = np.random.rand(1000000, 4)
    coords = points[:, 0:3]
    feat = points[:, 3]
    scales = [0.5, 1, 2]
    for radius in scales:
        with Pool() as pool:
            args = zip(points, repeat(radius),
                       repeat(coords),
                       repeat(feat),
                       repeat(kdtree),
                       repeat(False))
            feat_one_scale = pool.map(compute_desc_pool, args)

        feat_one_scale = np.array(feat_one_scale)
        if radius == scales[0]:
            all_features = feat_one_scale
        else: 
            all_features = np.hstack([all_features, feat_one_scale])

    # Others stuffs

【问题讨论】:

    标签: python parallel-processing multiprocessing


    【解决方案1】:

    通用解决方案是将一个元组序列传递给Pool.map,每个元组为您的工作函数保存一组参数,然后解包元组在工作函数中。

    因此,只需将您的函数更改为仅接受一个参数,参数的元组,您已经使用 zip 准备好并传递给 Pool.map。然后简单地将args 解包到变量中:

    def compute_desc(args):
        coord, radius, coords, feat, verbose = args
        # Compute here my descriptors
    

    另外,Pool.map 也应该与 numpy 类型一起使用,因为毕竟它们是有效的 Python 类型。

    只要确保正确zip 5 个序列,这样您的函数就会收到一个 5 元组。您无需在coords 中迭代pointzip 将为您完成:

    args = zip(coords, repeat(radius), repeat(coords), repeat(feat), repeat(False))
    # args is a list of [(coords[0], radius, coords, feat, False), (coords[1], ... )]
    

    (如果你这样做了,并将point 作为zip 的第一个序列,zip 将遍历该点,在本例中是一个 3 元素数组)。

    您的Pool.map 行应如下所示:

    for radius in scales:
        args = zip(coords, repeat(radius), repeat(coords), repeat(feat), repeat(False))
        feat_one_scale = [pool.map(compute_desc_pool, args)]
        # other stuff
    

    针对您的案例特定的解决方案,其中除一个以外的所有参数都是固定的,可以使用functools.partial(正如另一个答案所暗示的那样)。此外,您甚至不需要在第一个参数中解压coords,只需在coords 中传递索引[0..n],因为您的工作函数的每次调用都已经收到完整的coords 数组。

    【讨论】:

    • 不,它不起作用,我用中间函数尝试了你的解决方案(我想保留我原来的函数),但我有以下错误:ValueError: too many values to unpack (expected 6)
    • 请用准确的代码更新您的问题,以便可以重现错误。并且请写下来,以便任何人都可以复制/粘贴并尝试它(不会丢失导入等)。见stackoverflow.com/help/mcve
    • 太棒了!所以问题出在你的zip 表达式中。检查我的更新。
    • 现在可以使用了!但我有点意外,我在一个有 56 个进程的服务器上运行这个脚本,没想到我的代码运行速度快了 56 倍,但至少快了 3 倍
    • 并添加 Pool(),当我没有太多点时增加计算时间(例如 1000 左右)
    【解决方案2】:

    我从您的示例中假设这五个参数中的四个对于所有对compute_desc_pool 的调用都是不变的。如果是这样,那么您可以使用partial 来执行此操作。

    from functools import partial
    ....
    
    def compute_desc_pool(coord, radius, coords, feat, verbose):    
        compute_desc(coord, radius, coords, feat, verbose)
    
    def main():
        points = np.random.rand(1000000, 4)
        coords = points[:, 0:3]
        feat = points[:, 3]
        feat_one_scale = np.empty((1000000, 10))
        feat_one_scale[:] = np.NAN
        scales = [0.5, 1, 2]
        pool = Pool()
        for radius in scales:
            feat_one_scale = [pool.map(partial(compute_desc_pool, radius, coords, 
                                               feat, False), coords)]
    

    【讨论】:

    • 在你的例子中,为什么要定义compute_desc_pool,当它采用与compute_desc相同的参数时?
    • 因为这是原始问题的结构。当然也可以简化。问题显然是关于在使用 Pool.map() 时将静态参数与映射参数一起传递,而不是对仅作为示例提供的代码进行简单的简化。
    猜你喜欢
    • 2011-07-23
    • 2017-03-24
    • 2019-01-30
    • 1970-01-01
    • 2019-12-11
    • 1970-01-01
    • 2015-01-12
    相关资源
    最近更新 更多