【发布时间】:2016-07-02 23:30:41
【问题描述】:
我正在尝试了解 multiprocessing.Pool 的工作原理,并且我开发了一个最小示例来说明我的问题。简而言之,我使用 pool.map 来并行化在数组上运行的 CPU 绑定函数,遵循示例 Dead simple example of using Multiprocessing Queue, Pool and Locking。当我遵循这种模式时,我只能获得 4 个内核的适度加速,但如果我改为手动将数组分块为 num_threads,然后在这些块上使用 pool.map,我发现加速因子大大超过4x,这对我来说毫无意义。详细信息如下。
首先,函数定义。
def take_up_time():
n = 1e3
while n > 0:
n -= 1
def count_even_numbers(x):
take_up_time()
return np.where(np.mod(x, 2) == 0, 1, 0)
现在定义我们将进行基准测试的函数。
首先是串行运行的函数:
def serial(arr):
return np.sum(map(count_even_numbers,arr))
现在以“标准”方式使用 Pool.map 的函数:
def parallelization_strategy1(arr):
num_threads = multiprocessing_count()
pool = multiprocessing.Pool(num_threads)
result = pool.map(count_even_numbers,arr)
pool.close()
return np.sum(result)
最后,我手动将数组分块然后在块上运行 Pool.map 的第二种策略(由于python numpy split array into unequal subarrays而导致的拆分解决方案)
def split_padded(a,n):
""" Simple helper function for strategy 2
"""
padding = (-len(a))%n
if padding == 0:
return np.split(a, n)
else:
sub_arrays = np.split(np.concatenate((a,np.zeros(padding))),n)
sub_arrays[-1] = sub_arrays[-1][:-padding]
return sub_arrays
def parallelization_strategy2(arr):
num_threads = multiprocessing_count()
sub_arrays = split_padded(arr, num_threads)
pool = multiprocessing.Pool(num_threads)
result = pool.map(count_even_numbers,sub_arrays)
pool.close()
return np.sum(np.array(result))
这是我的数组输入:
npts = 1e3
arr = np.arange(npts)
现在我使用 IPython %timeit 函数来运行我的计时,对于 1e3 点,我得到以下结果:
- 串行:10 个循环,3 个循环中的最佳:每个循环 98.7 毫秒
- parallelization_strategy1:10 次循环,3 次中的最佳:每个循环 77.7 毫秒
- parallelization_strategy2:10 次循环,最好的 3 次:每个循环 22 毫秒
由于我有 4 个内核,因此策略 1 的加速幅度适中,令人失望,而策略 2 的加速比最大的 4 倍令人怀疑。
当我将 npts 增加到 1e4 时,结果更加令人困惑:
- 串行:1 次循环,最好的 3 次:每个循环 967 毫秒
- parallelization_strategy1:1 次循环,最好的 3 次:每个循环 596 毫秒
- parallelization_strategy2:10 个循环,3 个循环中的最佳:每个循环 22.9 毫秒
所以混淆的两个来源是:
- 策略 2 比幼稚的理论极限快得多
- 由于某种原因,npts=1e4 的 %timeit 仅触发串行和策略 1 的 1 个循环,但为策略 2 触发 10 个循环。
【问题讨论】:
-
您是否比较了不同策略的结果?
标签: python performance numpy parallel-processing python-multiprocessing