【问题标题】:Fastest way to get the minimum value of data array in another paired bin array在另一个成对的 bin 数组中获取数据数组最小值的最快方法
【发布时间】:2021-08-25 04:29:04
【问题描述】:

我有三个一维数组:

  • idxs:索引数据
  • weightsidxs中各个指标的权重
  • bins: 用于计算其中最小重量的 bin。

这是我目前使用idxs检查名为weights的数据在哪个bin中的方法,然后计算binned weights的最小值/最大值:

  1. 获取slices,它显示每个idxs 元素属于哪个bin。
  2. 同时对slicesweights进行排序。
  3. 计算每个 bin(切片)中 weights 的最小值。

numpy 方法

import random
import numpy as np

# create example data
out_size = int(10)
bins = np.arange(3, out_size-3)
idxs = np.arange(0, out_size)
#random.shuffle(idxs)

# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]

weights = idxs

# get which bin idxs belong to
slices = np.digitize(idxs, bins)

# get index and weights in bins
valid = (bins.max() >= idxs) & (idxs >= bins.min())
valid_slices = slices[valid]
valid_weights = weights[valid]

# sort slice and weights
sort_index = valid_slices.argsort()
valid_slices_sort = valid_slices[sort_index]
valid_weights_sort = valid_weights[sort_index]

# get index of each first unque slices
unique_slices, unique_index = np.unique(valid_slices_sort, return_index=True)
# calculate the minimum
res_sub = np.minimum.reduceat(valid_weights_sort, unique_index)

# save results
res = np.full((out_size), np.nan)
res[unique_slices-1] = res_sub

print(res)

结果:

array([ 3., nan,  5., nan, nan, nan, nan, nan, nan, nan])

如果我将out_size 增加到 1e7 并打乱数据,速度(从 np.digitize 到最后)很慢:

13.5 s ± 136 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

而且,这是每个部分的消耗时间:

np.digitize: 10.8 s ± 12.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
valid: 171 ms ± 3.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
argsort and slice: 2.02 s ± 33.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
unique: 9.9 ms ± 113 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
np.minimum.reduceat: 5.11 ms ± 52.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

np.digitize() 花费最多的时间:10.8 秒。而且,接下来是argsort: 2.02 秒。

我还检查了使用np.histogram计算mean所消耗的时间:

counts, _ = np.histogram(idxs, bins=out_size, range=(0, out_size))
sums, _ = np.histogram(idxs, bins=out_size, range=(0, out_size), weights = weights, density=False)
mean = sums / np.where(counts == 0, np.nan, counts)

33.2 s ± 3.47 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

这类似于我计算最小值的方法。

scipy方法

from scipy.stats import binned_statistic
statistics, _, _ = binned_statistic(idxs, weights, statistic='min', bins=bins)

print(statistics)

结果略有不同,但对于较长(1e7)的混洗数据,速度要慢得多(x6):

array([ 3., nan,  5.])

1min 20s ± 6.93 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

总结

我想找出一个更快的方法。如果该方法也适用于dask,那就太好了!

用户案例

这是我的真实数据 (1D) 的样子:

【问题讨论】:

  • 对于您的实际用例,您是否需要维护 binned_statistic 的权重功能? (例如,用于计算“平均值”)
  • @SultanOrazbayev 权重只是数据。我不需要乘以它来计算。对不起,误导的名字。我只想获取每个 bin 中数据的最小值或最大值或平均值。
  • 好的,没有binned_statistic 很容易做到最小值/最大值,但是对于计算平均值,权重很重要......我会想一个好的答案。
  • 平均而言,我知道快速方法:只需使用直方图计算总和和计数。请随时发布最小/最大解决方案;)
  • 对您的数据进行排序/排序有任何假设吗?数字化或计算直方图通常涉及排序,因此您将很难击败 O(nlog(n)),除非您可以对数据的排序做出假设。

标签: python pandas numpy scipy dask


【解决方案1】:

SultanOrazbayev 展示了一种快速的方法;我会添加一个很酷的。

mask = bins[:, None] == idxs[None, :]
result = np.nanmin(np.where(mask, weights, np.nan), axis=-1)
# Note: may produce (expected) runtime warning if bin has no values

当然,np.nanmaxnp.nanmean等也可以。

以上假设您的垃圾箱确实是单个值。如果它们是范围,则构建掩码需要稍微多一点工作

lower_mask = idxs[None, :] >= bins[:, None]
upper_mask = np.empty_like(lower_mask)
upper_mask[:-1, ...] = idxs[None, :] < bins[1:, None]
upper_mask[-1, ...] = False

mask = lower_mask & upper_mask

此时您可以像上面一样使用np.nanmin


Ofc np.where 和创建掩码的广播将创建具有各自数据类型的新形状数组 (len(bins), len(idxs))。如果您不关心这些,那么以上内容可能就足够了。

如果这是一个问题(因为您迫切需要 RAM),那么我的第一个建议是购买更多 RAM。如果 - 出于某种愚蠢的原因(例如,金钱) - 这不起作用,您可以通过在手动重新跨步视图中使用蒙版数组来避免 weights 的副本进入 weights

import numpy.ma as ma

mask = ...

restrided_weights = np.lib.stride_tricks.as_strided(weights, shape=(bins.size, idxs.size), strides=(0, idxs.strides[0]))
masked = ma.masked_array(restrided_weights, mask=~mask, fill_value=np.nan, dtype=np.float64)
result = masked.min(axis=-1).filled(np.nan)

这避免了weights 的副本和上述运行时警告。

如果你甚至没有足够的内存来构造mask,那么你可以尝试分块处理数据。

上次我检查过,Dask 在使用手动跨步数组时曾经有过有趣的行为。虽然对此进行了一些工作,因此您可能需要仔细检查是否已解决,在这种情况下,您可以愉快地并行化上述操作。


更新基于您对该答案和其他答案的进一步 cmet:

您可以分块进行此计算,以避免由于您的大量 bin(1e4 大小)导致的内存问题。将具体数字放入完整示例并添加进度条表示在单核上运行时间

import numpy.ma as ma
from tqdm import trange
import numpy as np
import random

# create example data
out_size = int(1.5e5)
#bins = np.arange(3, out_size-3)
bins = np.arange(3, int(3.8e4-3), dtype=np.int64)
idxs = np.arange(0, out_size)
random.shuffle(idxs)

# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]

weights = idxs

chunk_size = 100

# preallocate buffers to avoid array creation in main loop
extended_bins = np.empty(len(bins) + 1, dtype=bins.dtype)
extended_bins[:-1] = bins
extended_bins[-1] = np.iinfo(bins.dtype).max # last bin goes to infinity
mask_buffer = np.empty((chunk_size, len(idxs)), dtype=bool)


result = np.empty_like(bins, dtype=np.float64)

for low in trange(0, len(bins), chunk_size):
    high = min(low + chunk_size, len(bins))
    chunk_size = high - low
    mask_buffer[:chunk_size, ...] = ~((bins[low:high, None] <= idxs[None, :]) & (extended_bins[low+1:high+1, None] > idxs[None, :]))
    mask = mask_buffer[:chunk_size, ...]
    restrided_weights = np.lib.stride_tricks.as_strided(weights, shape=mask.shape, strides=(0, idxs.strides[0]))
    masked = ma.masked_array(restrided_weights, mask=mask, fill_value=np.nan, dtype=np.float64)
    result[low:high] = masked.min(axis=-1).filled(np.nan)

奖励:对于minmax ,您可以使用一个很酷的技巧:根据@987654337 对idxsweights 进行排序@(最小值为升序,最大值为降序)。这样,您可以立即查找最小值/最大值,并且可以完全避免屏蔽数组和自定义步幅。这依赖于 np.argmax 的一些没有很好记录的行为,它对布尔数组进行快速传递并且不搜索整个数组。

它只适用于这两种情况,你必须回退到上面的更复杂的东西(平均),但对于这两种情况,它会再减少约 70% 并且在单核时钟上运行

# fast min/max
from tqdm import trange
import numpy as np

# create example data
out_size = int(1.5e5)
#bins = np.arange(3, out_size-3)
bins = np.arange(3, int(3.8e4-3), dtype=np.int64)
idxs = np.arange(0, out_size)
random.shuffle(idxs)

# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]

weights = idxs


order = np.argsort(weights)
weights_sorted = np.empty((len(weights) + 1), dtype=np.float64)
weights_sorted[:-1] = weights[order]
weights_sorted[-1] = np.nan

idxs_sorted = idxs[order]

extended_bins = np.empty(len(bins) + 1, dtype=bins.dtype)
extended_bins[:-1] = bins
extended_bins[-1] = np.iinfo(bins.dtype).max # last bin goes to infinity

# preallocate buffers to avoid array creation in main loop
chunk_size = 1000
mask_buffer = np.empty((chunk_size, len(idxs) + 1), dtype=bool)
mask_buffer[:, -1] = True
result = np.empty_like(bins, dtype=np.float64)

for low in trange(0, len(bins), chunk_size):
    high = min(low + chunk_size, len(bins))
    chunk_size = high - low
    mask_buffer[:chunk_size, :-1] = (bins[low:high, None] <= idxs_sorted[None, :]) & (extended_bins[low+1:high+1, None] > idxs_sorted[None, :])
    mask = mask_buffer[:chunk_size, ...]
    weight_idx = np.argmax(mask, axis=-1)

    result[low:high] = weights_sorted[weight_idx]

【讨论】:

  • 谢谢,这很有趣,让我想在某个时候用 dask.arrays 试试这个。
  • @FirefoxMetzger 你的方法太棒了!我用我的例子进行了测试,发现了一个奇怪的问题:如你所说,将长度增加到 1e5 时出现此内存错误。但是,将其增加到 1e7,第一种方法返回一个值:nan,而第二种方法仍然出现内存错误。
  • @SultanOrazbayev 如果我将这种方法与这样的 dask 一起应用:bins = da.array(bins)idxs = da.array(idxs),我会得到与上述类似的错误。我想简单地转换为 dask 数组根本没有任何改进。你知道什么是满足它的正确方法吗?
  • @XinZhang 与 dask 数组一起应用比将所有内容都包含在 da.array...
  • @XinZhang 我已经更新了答案,以回应您对内存使用的担忧。当然,如果您愿意,您可以重构上述方法以使用 dask 数组和多线程;就个人而言,我认为 np.where 产生单个 nan 结果,我不确定为什么会发生这种情况,但可以在我的机器上重现它。
【解决方案2】:

使用dask.dataframepd.cut 可以快速实现这一目标,我首先展示的是pandas 版本:

import numpy as np
from scipy.stats import binned_statistic as bs
import pandas as pd

nrows=10**7

df = pd.DataFrame(np.random.rand(nrows, 2), columns=['x', 'val'])

bins = np.linspace(df['x'].min(), df['x'].max(), 10)

df['binned_x'] = pd.cut(df['x'], bins=bins, right=False)

result_pandas = df.groupby('binned_x')['val'].min().values
result_scipy = bs(df['x'], df['val'], 'min', bins=bins)[0]

print(np.isclose(result_pandas, result_scipy))
# [ True  True  True  True  True  True  True  True  True]

现在要从 pandas 转到 dask,您需要确保 bin 在分区之间保持一致,因此请查看 here。一旦每个分区都被一致地分箱,您想要应用所需的操作(最小/最大/总和/计数):

import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=10)

def f(df, bins):
    df = df.copy()
    df['binned_x'] = pd.cut(df['x'], bins=bins, right=False)
    result = df.groupby('binned_x', as_index=False)['val'].min()
    return result

result_dask = ddf.map_partitions(f, bins).groupby('binned_x')['val'].min().compute()

print(np.isclose(result_pandas, result_dask))
# [ True  True  True  True  True  True  True  True  True]

在我的笔记本电脑上,第一个代码大约需要 7 3 秒,第二个代码大约快 10 倍(忘了我在重复计算 pandas 和 scipy执行相同的操作)。分区有一定的空间,但这取决于上下文,所以你可以尝试优化你的数据/硬件。

更新:请注意,这种方法适用于最小值/最大值,但对于平均值,您需要计算总和和计数,然后将它们相除。可能有一种很好的方法可以在一次完成此计算时跟踪权重,但这可能不值得增加代码复杂性。

【讨论】:

  • 谢谢,这种方法在没有内存错误的情况下效果很好。 FirefoxMetzger 的方法可以和 dask 结合吗?这可能会提高速度并同时节省内存。
  • 是的,我确信这是可能的。
  • @XinZhang 在重新阅读苏丹的答案时,我意识到他/她已经修复了 10 的数量 bins 并将 idxsweights 缩放为 1e7。我们是否正在寻找一种解决方案来保持垃圾箱的数量较少,或者垃圾箱的数量1e7(或某个数量级)?
  • @FirefoxMetzger 每个数据的确切长度显示为问题描述末尾User Case 中称为dim_0 的x 轴。 idxs 和 weights 的数量在 1.5e5 左右,bin 的数量在 3.8e4 左右。
猜你喜欢
  • 2021-09-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多