【问题标题】:more efficient way to calculate distance in numpy?在numpy中计算距离的更有效方法?
【发布时间】:2023-03-18 14:53:01
【问题描述】:

我有一个关于如何在 numpy 中尽可能快地计算距离的问题,

def getR1(VVm,VVs,HHm,HHs):
    t0=time.time()
    R=VVs.flatten()[numpy.newaxis,:]-VVm.flatten()[:,numpy.newaxis]
    R*=R
    R1=HHs.flatten()[numpy.newaxis,:]-HHm.flatten()[:,numpy.newaxis]
    R1*=R1
    R+=R1
    del R1
    print "R1\t",time.time()-t0, R.shape, #11.7576191425 (108225, 10500) 
    print numpy.max(R) #4176.26290975
    # uses 17.5Gb ram
    return R


def getR2(VVm,VVs,HHm,HHs):
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    deltas = precomputed_flat[None,:,:] - measured_flat[:, None, :]
    #print time.time()-t0, deltas.shape # 5.861109972 (108225, 10500, 2)
    R = numpy.einsum('ijk,ijk->ij', deltas, deltas)
    print "R2\t",time.time()-t0,R.shape, #14.5291359425 (108225, 10500)
    print numpy.max(R) #4176.26290975
    # uses 26Gb ram
    return R


def getR3(VVm,VVs,HHm,HHs):
    from numpy.core.umath_tests import inner1d
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    deltas = precomputed_flat[None,:,:] - measured_flat[:, None, :]
    #print time.time()-t0, deltas.shape # 5.861109972 (108225, 10500, 2)
    R = inner1d(deltas, deltas)
    print "R3\t",time.time()-t0, R.shape, #12.6972110271 (108225, 10500)
    print numpy.max(R) #4176.26290975
    #Uses 26Gb
    return R


def getR4(VVm,VVs,HHm,HHs):
    from scipy.spatial.distance import cdist
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    R=spdist.cdist(precomputed_flat,measured_flat, 'sqeuclidean') #.T
    print "R4\t",time.time()-t0, R.shape, #17.7022118568 (108225, 10500)
    print numpy.max(R) #4176.26290975
    # uses 9 Gb ram
    return R

def getR5(VVm,VVs,HHm,HHs):
    from scipy.spatial.distance import cdist
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    R=spdist.cdist(precomputed_flat,measured_flat, 'euclidean') #.T
    print "R5\t",time.time()-t0, R.shape, #15.6070930958 (108225, 10500)
    print numpy.max(R) #64.6240118667
    # uses only 9 Gb ram
    return R

def getR6(VVm,VVs,HHm,HHs):
    from scipy.weave import blitz
    t0=time.time()
    R=VVs.flatten()[numpy.newaxis,:]-VVm.flatten()[:,numpy.newaxis]
    blitz("R=R*R") # R*=R
    R1=HHs.flatten()[numpy.newaxis,:]-HHm.flatten()[:,numpy.newaxis]
    blitz("R1=R1*R1") # R1*=R1
    blitz("R=R+R1") # R+=R1
    del R1
    print "R6\t",time.time()-t0, R.shape, #11.7576191425 (108225, 10500) 
    print numpy.max(R) #4176.26290975
    return R

结果如下:

R1  11.7737319469 (108225, 10500) 4909.66881791
R2  15.1279799938 (108225, 10500) 4909.66881791
R3  12.7408981323 (108225, 10500) 4909.66881791
R4  17.3336868286 (10500, 108225) 4909.66881791
R5  15.7530870438 (10500, 108225) 70.0690289494
R6  11.670968771 (108225, 10500) 4909.66881791

虽然最后一个给出 sqrt((VVm-VVs)^2+(HHm-HHs)^2),而其他给出 (VVm-VVs)^2+(HHm-HHs)^2,这不是真的很重要,因为否则在我的代码中,我为每个 i 取 R[i,:] 的最小值,并且 sqrt 无论如何都不会影响最小值,(如果我对距离感兴趣,我只取 sqrt(value ),而不是对整个数组进行 sqrt,因此实际上没有时间差异。

问题仍然存在:为什么第一个解决方案是最好的,(第二个和第三个较慢的原因是因为 deltas=... 需要 5.8 秒,(这也是这两种方法需要 26Gb 的原因)),并且为什么 skeuclidean 比 euclidean 慢?

sqeuclidean 应该只做 (VVm-VVs)^2+(HHm-HHs)^2,而我认为它做了一些不同的事情。任何人都知道如何找到该方法的源代码(C 或底部的任何内容)?我认为它确实 sqrt((VVm-VVs)^2+(HHm-HHs)^2)^2 (我能想到它为什么会比 (VVm-VVs)^2+(HHm-HHs) 慢的唯一原因) ^2 - 我知道这是一个愚蠢的理由,有人有更合乎逻辑的理由吗?)

由于我对 C 一无所知,我将如何将其与 scipy.weave 内联?并且该代码是否可以像使用 python 一样正常编译?还是我需要为此安装特殊的东西?

编辑:好的,我用 scipy.weave.blitz 尝试过,(R6 方法),这稍微快了一点,但我认为比我了解更多 C 的人仍然可以提高这个速度?我只是取了格式为 a+=b 或 *= 的行,并查看了它们在 C 中的情况,并将它们放入 blitz 语句中,但我想如果我将带有 flatten 和 newaxis 的语句放入C 也是如此,它也应该更快,但我不知道我该怎么做(知道 C 的人可能会解释吗?)。现在,闪电战的东西和我的第一种方法之间的差异还不够大,我猜真的是由 C 和 numpy 引起的?

我猜像 deltas=... 之类的其他方法也可以更快,当我将它放入 C 时?

【问题讨论】:

  • 考虑尝试类似jakevdp.github.io/blog/2013/06/15/numba-vs-cython-take-2 的方法(尤其是“numpy with broadcasting”部分)
  • 你可以通过不为R分配内存来节省几秒钟的时间(即,只使用R1 += R3)。
  • @bogatron 是的,与 R1*=R1 相同,但仍然不会将其减少到 1 秒左右,(我认为当它从 numpy 完全在 C 中时应该会发生)?
  • 不会缩短到 1 秒,但如果您使用 32 位浮点数,这将节省您分配大约 4 GB 的 RAM,这很重要。如果它可以让你避免使用交换,那么这将是一个显着的改进。考虑到它需要多少内存(除非你有很多 RAM 并且是显着的多线程),我会很惊讶它在 C 中可以降低到 1 秒(即使没有 python)
  • 获得了大约 50Gb 的内存,所以还没有交换

标签: python memory numpy


【解决方案1】:

只要有乘法和求和,请尝试使用其中一种点积函数或np.einsum。由于您正在预先分配数组,而不是为水平和垂直坐标设置不同的数组,因此将它们堆叠在一起:

precomputed_flat = np.column_stack((svf.flatten(), shf.flatten()))
measured_flat = np.column_stack((VVmeasured.flatten(), HHmeasured.flatten()))
deltas = precomputed_flat - measured_flat[:, None, :]

从这里开始,最简单的是:

dist = np.einsum('ijk,ijk->ij', deltas, deltas)

你也可以试试:

from numpy.core.umath_tests import inner1d
dist = inner1d(deltas, deltas)

当然还有SciPy的空间模块cdist

from scipy.spatial.distance import cdist
dist = cdist(precomputed_flat, measured_flat, 'euclidean')

编辑 我无法在如此大的数据集上运行测试,但这些时间安排相当有启发性:

len_a, len_b = 10000, 1000

a = np.random.rand(2, len_a)
b =  np.random.rand(2, len_b)
c = np.random.rand(len_a, 2)
d = np.random.rand(len_b, 2)

In [3]: %timeit a[:, None, :] - b[..., None]
10 loops, best of 3: 76.7 ms per loop

In [4]: %timeit c[:, None, :] - d
1 loops, best of 3: 221 ms per loop

对于上述较小的数据集,通过在内存中以不同方式排列数据,我可以稍微加快使用 scipy.spatial.distance.cdist 的方法并将其与 inner1d 匹配:

precomputed_flat = np.vstack((svf.flatten(), shf.flatten()))
measured_flat = np.vstack((VVmeasured.flatten(), HHmeasured.flatten()))
deltas = precomputed_flat[:, None, :] - measured_flat

import scipy.spatial.distance as spdist
from numpy.core.umath_tests import inner1d

In [13]: %timeit r0 = a[0, None, :] - b[0, :, None]; r1 = a[1, None, :] - b[1, :, None]; r0 *= r0; r1 *= r1; r0 += r1
10 loops, best of 3: 146 ms per loop

In [14]: %timeit deltas = (a[:, None, :] - b[..., None]).T; inner1d(deltas, deltas)
10 loops, best of 3: 145 ms per loop

In [15]: %timeit spdist.cdist(a.T, b.T)
10 loops, best of 3: 124 ms per loop

In [16]: %timeit deltas = a[:, None, :] - b[..., None]; np.einsum('ijk,ijk->jk', deltas, deltas)
10 loops, best of 3: 163 ms per loop

【讨论】:

  • 替代np.einsum 可以使用np.tensordot(),它也有一个非常灵活的符号...
  • 遗憾的是,您建议的所有 3 种方法都较慢,(deltas=... 已经需要六秒钟,这就是它们较慢的原因)
  • 有趣的是内存管理如何破坏了最好的计划......我不完全理解发生了什么,但请参阅我的编辑。你可能想在你的大数组上尝试上述方法,看看时间是否有不同的表现,但使用 scipy 可能会有一些优势。
  • 你最快的方式仍然比我的慢,但我认为这是因为在我的情况下,我只是做 (x-x')**2+(y-y')**2, (稍后,我从中得到最小值,但是取它的 sqrt 不会改变最小值的位置,所以我在计算中不这样做,而 cdist 也这样做(我假设),(那个[15]:在 15.7 秒超时,而我的在 11.8 秒超时,但我认为这是由于 cdist 例程中采用了 sqrt,但我真的是这样想的,如果有一些没有 sqrt 的例程它,它最终会更快
  • 根据文档,spdist.cdist(a.T, b.T, 'sqeuclidean') 应该这样做,现在无法测试。无论如何,有趣的是,当您使用大量内存时,内存处理如何成为一切!
猜你喜欢
  • 2018-11-30
  • 1970-01-01
  • 1970-01-01
  • 2019-07-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-06-19
相关资源
最近更新 更多