【问题标题】:conditional vectorized calculation with numpy arrays without using direct masking使用 numpy 数组进行条件向量化计算而不使用直接屏蔽
【发布时间】:2020-11-27 03:59:34
【问题描述】:

following up on another question

import numpy as np

repeat=int(1e5)
r_base = np.linspace(0,4,5)
a_base = 2
np.random.seed(0)
r_mat = r_base * np.random.uniform(0.9,1.1,(repeat,5))

a_array = a_base * np.random.uniform(0.9,1.1, repeat)


# original slow approach
def func_vetorized_level1(r_row, a):
    if r_row.mean()>2:
        result = np.where((r_row >= a), r_row - a, np.nan)
    else:
        result = np.where((r_row >= a), r_row + a, 0)
    return result
# try to broadcast this func to every row of r_mat using list comprehension
def func_list_level2(r_mat, a_array):
    res_mat = np.array([func_vetorized_level1(this_r_row, this_a) 
                        for this_r_row, this_a in zip(r_mat, a_array)])
    return res_mat

# faster with direct masking, but with unnecessary more calculation
def f_faster(r_mat,a_array):
    a = a_array[:, None]  # to column vector

    row_mask = (r_mat.mean(axis=1) > 2)[:,None]
    elem_mask = r_mat >= a

    out = np.empty_like(r_mat)

    out[row_mask & elem_mask] = (r_mat - a)[row_mask & elem_mask]
    out[~row_mask & elem_mask] = (r_mat + a)[~row_mask & elem_mask]
    out[row_mask & ~elem_mask] = np.nan
    out[~row_mask & ~elem_mask] = 0
    
    return out

# fastest with ufunc in numpy as suggested by @mad_physicist
def f_fastest(r_mat,a_array):
    a = a_array[:, None]  # to column vector

    row_mask = (r_mat.mean(axis=1) > 2)[:,None]
    elem_mask = r_mat >= a

    out = np.empty_like(r_mat)


    np.subtract(r_mat, a, out=out, where=row_mask & elem_mask)
    np.add(r_mat, a, out=out, where=~row_mask & elem_mask)
    out[row_mask & ~elem_mask] = np.nan
    out[~row_mask & ~elem_mask] = 0
    
    return out

我想问是否有可能有一个可以使用的用户定义函数,或者利用最快的方法?我考虑过使用索引,但发现它具有挑战性,因为使用[row_ind, co_ind] 的切片元素是所选元素的一维数组。我看到可以使用reshape 将切片矩阵放入矩阵中,但是有没有一种优雅的方法呢?理想情况下,这个r_mat + a 操作可以被用户定义的函数替换。

【问题讨论】:

  • 简短的回答是否定的,你不能有一个完全任意的用户定义的函数而不循环。但是,您可以强制用户提供一个接受 where 作为参数的函数。
  • 等等,没关系。您之前已经回答了自己的问题。将立即发布。
  • np.frompyfunc 确实创建了一个 ufunc,它采用了 outwhere,但速度与内置的 ufunc 完全不同。

标签: python numpy conditional-statements vectorization


【解决方案1】:

您绝对可以使用用户定义的函数获得矢量化解决方案,只要该函数被矢量化以在一维数组上按元素工作(对于使用开箱即用的 numpy 函数编写的任何东西都应该是这种情况) .

假设您有r_mat 作为(m, n) 矩阵和a_array 作为(m,) 向量。你可以编写你的函数来接受钩子。每个钩子可以是常量或可调用的。如果它是可调用的,它会被两个相同长度的数组调用,并且必须返回第三个相同长度的数组。您可以随意更改该合约以包含索引或任何您想要的内容:

def f(r_mat, a_array, hook11, hook01, hook10, hook00):
    a = a_array[:, None]  # to column vector

    row_mask = (r_mat.mean(axis=1) > 2)[:,None]
    elem_mask = r_mat >= a

    out = np.empty_like(r_mat)

    def apply_hook(mask, hook):
        r, c = np.nonzero(mask)
        out[r, c] = hook(r_mat[r, c], a_array[r]) if callable(hook) else hook

    apply_hook(row_mask & elem_mask, hook11)
    apply_hook(~row_mask & elem_mask, hook01)
    apply_hook(row_mask & ~elem_mask, hook10)
    apply_hook(~row_mask & ~elem_mask, hook00)

    return out

您代码中的当前配置将被称为

f(r_mat, a_array, np.subtract, np.add, np.nan, 0)

假设您想做比np.subtract 更复杂的事情。例如,您可以这样做:

def my_complicated_func(r, a):
    return np.cumsum(r, a) - 3 * r // a + np.exp(a)

f(r_mat, a_array, my_complicated_func, np.add, np.nan, 0.0)

关键是my_complicated_func 对数组进行操作。它将传递r_mat 元素的子集,并且a_array 的元素在每一行中根据需要重复多次。

您也可以使用知道每个位置索引的函数来做同样的事情。只需将hook 称为hook(r_mat[r, c], a_array[r], r, c)。现在钩子函数必须接受两个额外的参数。原始代码相当于

f(r_mat, a_array, lambda r, a, *args: np.subtract(r, a), lambda r, a, *args: np.add(r, a), np.nan, 0)

【讨论】:

  • “hook”方法与本示例中的直接屏蔽方法类似或稍慢。
  • @Gang。这是灵活性的代价。在某些情况下,使用索引会稍微快一些,因为它不需要额外的通过来确定输出大小。取决于掩码密度和输入大小。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-20
  • 2020-11-26
  • 1970-01-01
  • 2015-11-23
  • 2021-03-03
  • 2012-03-18
相关资源
最近更新 更多