【问题标题】:Using numpy array in shared memory slow when synchronizing access同步访问时在共享内存中使用 numpy 数组很慢
【发布时间】:2015-10-07 19:47:39
【问题描述】:

我编写了一个程序,它接收大型数据集作为输入(~150mb 文本文件),对它们进行一些数学运算,然后在直方图中报告结果。必须执行的计算次数与数据集中两个点的组合数量成正比,对于 100 万个点的数据集来说,这是非常大的(约 50 亿)。

我希望通过使用 Python 的 multiprocessing 模块将部分直方图数据的计算分配给各个进程,同时将最终直方图的数组保存在共享内存中,以便每个进程可以添加到它,从而减少一些计算时间.

我已经使用multiprocessing 创建了这个程序的工作版本,通常基于this answer 中描述的过程,但是我发现它实际上比我之前编写的非并行版本慢了一点。我尝试取消对共享阵列的同步访问,发现这会显着加快速度,但会导致部分数据丢失。

下面是代码的概要:

import numpy as np
from multiprocessing import Pool, Array

BINS = 200
DMAX = 3.5
DMIN = 0

def init(histo):
    global histo_shared
    histo_shared = histo

def to_np_array(mp_array):
    return np.frombuffer(mp_array.get_obj())

# synchronize access to shared array
def calc_sync(i):
    with histo_shared.get_lock():
        calc_histo(i)

def calc_histo(i):
    # create new array 'd_new' by doing some math on DATA using argument i
    histo = to_np_array(histo_shared)
    histo += np.histogram(d_new, bins=BINS,
        range=(DMIN, DMAX))[0].astype(np.int32)

def main():
    # read in data and calculate no. of iterations
    global DATA
    DATA = np.loadtxt("data.txt")
    it = len(DATA) // 2

    # create shared array 
    histo_shared = Array('l',  BINS)

    # write to shared array from different processes
    p = Pool(initializer=init, initargs=(histo_shared,))
        for i in range(1, it + 1):
            p.apply_async(calc_sync, [i])
    p.close()
    p.join()

    histo_final = to_np_array(histo_shared)
    np.savetxt("histo.txt", histo_final)

if __name__ == '__main__':
    main()

这里有什么我遗漏的东西会对我的表现产生严重影响吗?有什么办法可以解决这个问题以加快速度?

非常感谢任何见解或建议!

【问题讨论】:

    标签: python arrays performance numpy multiprocessing


    【解决方案1】:

    您实际上是在锁定可能获得的任何并行性,因为在您处理的整个过程中数据都处于锁定状态。

    当这个方法

    def calc_sync(i):
        with histo_shared.get_lock():
            calc_histo(i)
    

    正在执行,您在处理直方图时锁定了整个共享数据集。还要注意

    def calc_histo(i):
        # create new array 'd_new' by doing some math on DATA using argument i
        histo = to_np_array(histo_shared)
        histo += np.histogram(d_new, bins=BINS,
            range=(DMIN, DMAX))[0].astype(np.int32)
    

    没有对 i 做任何事情,所以看起来你正在重新处理相同的数据。什么是 d_new?我在您的列表中没有看到它。

    理想情况下,您应该做的是获取大型数据集,将其切成若干块并单独处理,然后组合结果。只锁定共享数据,不锁定处理步骤。这可能看起来像这样:

    def calc_histo(slice):
        # process the slice asyncronously
        return np.histogram(slice, bins=BINS,
            range=(DMIN, DMAX))[0].astype(np.int32)
    
    def calc_sync(start,stop):
    
        histo = None
    
        # grab a chunk of data, you likely don't need to lock this
        histo = raw_data[start:stop]
    
        # acutal calculation is async
        result = calc_histo(histo)
    
        with histo_shared.get_lock():
             histo_shared += result
    

    对于成对数据:

    def calc_sync(part1,part2):
    
        histo = None
        output = [] # or numpy array
        # acutal calculation is async
        for i in range(part1):
            for j in range(part2):
                  # do whatever computation you need and add it to output
    
        result = calc_histo(output)
    
        with histo_shared.get_lock():
             histo_shared += result
    

    现在

     p = Pool(initializer=init, initargs=(histo_shared,))
     for i in range(1, it + 1,slice_size):
         for j in range(1, it + 1,slice_size):
             p.apply_async(calc_sync, [histo_shared[j:j+slice_size], histo_shared[i:i+slice_size])
    

    换句话说,我们对数据进行成对切割,生成相关数据,然后将它们放入直方图中。您需要的唯一真正同步是在直方图中组合数据时

    【讨论】:

    • 不幸的是,我需要整个数组来计算直方图数据,因为它涉及比较数组中的每个元素。 i 用于确定它们的比较方式;抱歉,如果我对此不清楚!无论如何,您对锁定处理步骤是正确的。我将您的解决方案改编为我的代码,现在它的工作速度更快。谢谢!
    • 我不太清楚你想如何并行化它。如果您要对数组中的元素进行成对比较,那么您可以取数组的两个切片,请参阅我的编辑
    猜你喜欢
    • 1970-01-01
    • 2016-06-12
    • 1970-01-01
    • 2019-10-22
    • 2011-02-04
    • 2021-11-26
    • 1970-01-01
    • 2012-01-21
    • 2012-12-31
    相关资源
    最近更新 更多