【问题标题】:How to improve np.random.choice() looping efficiency如何提高 np.random.choice() 循环效率
【发布时间】:2019-12-05 20:02:27
【问题描述】:

我正在尝试将np.random.choice 应用于具有不同权重的大数组,并且想知道有什么方法可以避免循环并提高性能?在这里len(weights) 可能是数百万。

weights = [[0.1, 0.5, 0.4],
           [0.2, 0.4, 0.4],
           ...
           [0.3, 0.3, 0.4]]

choice = [1, 2, 3]
ret = np.zeros((len(weights), 20))
for i in range(len(weights)):
    ret[i] = np.random.choice(choice, 20, p=weights[i])

【问题讨论】:

  • 恐怕你会忍不住循环播放。也许,您仍然可以使用numba 来加快速度?可能有帮助的一件事是,如果您有很多 weights 重复项,您可以通过单个 np.random.choice() 调用生成这些重复项,然后将其放在正确的 ret 位置。此外,您可能希望将:for i in range(len(weights)): ... 更改为 for i, weight in enumerate(weights): ...(但这不会提高您的速度,这只是风格上的良好做法)。
  • @WarrenWeckesser 他们如何在链接的项目中合并项目数量?
  • @Divakar,嗯,你知道,广播等 :) 好的,它不是完全重复的。
  • 那是“权重”的形状?它可能对基准测试有用

标签: python python-3.x numpy vectorization


【解决方案1】:

这是我在Fast random weighted selection across all rows of a stochastic matrix 中的回答的概括:

def vectorized_choice(p, n, items=None):
    s = p.cumsum(axis=1)
    r = np.random.rand(p.shape[0], n, 1)
    q = np.expand_dims(s, 1) >= r
    k = q.argmax(axis=-1)
    if items is not None:
        k = np.asarray(items)[k]
    return k

p 应该是一个二维数组,其行是概率向量。 n 是从每行定义的分布中抽取的样本数。如果items 为无,则样本为range(0, p.shape[1]) 中的整数。如果items 不是None,它应该是一个长度为p.shape[1] 的序列。

例子:

In [258]: p = np.array([[0.1, 0.5, 0.4], [0.75, 0, 0.25], [0, 0, 1], [1/3, 1/3, 1/3]])                                                   

In [259]: p                                                                                                                              
Out[259]: 
array([[0.1       , 0.5       , 0.4       ],
       [0.75      , 0.        , 0.25      ],
       [0.        , 0.        , 1.        ],
       [0.33333333, 0.33333333, 0.33333333]])

In [260]: vectorized_choice(p, 20)                                                                                                       
Out[260]: 
array([[1, 1, 2, 1, 1, 2, 2, 2, 1, 2, 1, 1, 1, 2, 2, 0, 1, 2, 2, 2],
       [0, 2, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0],
       [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
       [1, 0, 2, 2, 0, 1, 2, 1, 0, 0, 0, 0, 2, 2, 0, 0, 2, 1, 1, 2]])

In [261]: vectorized_choice(p, 20, items=[1, 2, 3])                                                                                      
Out[261]: 
array([[2, 1, 2, 2, 2, 3, 2, 2, 2, 2, 3, 3, 2, 2, 3, 3, 2, 3, 2, 2],
       [1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 1, 1, 3, 3, 1, 3, 1, 1, 1],
       [3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3],
       [3, 3, 3, 1, 3, 2, 1, 2, 3, 1, 2, 2, 3, 2, 1, 2, 1, 2, 2, 2]])

p 与形状 (1000000, 3) 的计时:

In [317]: p = np.random.rand(1000000, 3)

In [318]: p /= p.sum(axis=1, keepdims=True)

In [319]: %timeit vectorized_choice(p, 20, items=np.arange(1, p.shape[1]+1))
1.89 s ± 28.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

这是 Divakar 功能的时间安排:

In [320]: %timeit random_choice_prob_vectorized(p, 20, choice=np.arange(1, p.shape[1]+1))
7.33 s ± 43.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

如果你增加p中的列数,差异会不那么明显,如果你使列数足够大,Divakar的功能会更快。例如

In [321]: p = np.random.rand(1000, 120)

In [322]: p /= p.sum(axis=1, keepdims=True)

In [323]: %timeit vectorized_choice(p, 20, items=np.arange(1, p.shape[1]+1))
6.41 ms ± 20.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [324]: %timeit random_choice_prob_vectorized(p, 20, choice=np.arange(1, p.shape[1]+1))
6.29 ms ± 342 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

【讨论】:

  • 很高兴看到时间安排!有道理。
  • 更新了我的解决方案。不如你的好,但比我之前的好很多。你能麻烦更新一下时间吗?谢谢!
  • @Divakar,好的,但是在我的测试中,新版本实际上比旧版本慢了一点。 :(
  • 谢谢,但是在vectorized_choice中,prob_matrix应该是p,对吧?
  • @Nick,是的,已修复。
【解决方案2】:

借鉴Vectorizing numpy.random.choice for given 2D array of probabilities along an axisidea from vectorized searchsorted的想法,这是一种矢量化方式-

def random_choice_prob_vectorized(weights, num_items, choice=None):
    weights = np.asarray(weights)

    w = weights.cumsum(1)
    r = np.random.rand(len(weights),num_items)

    m,n = w.shape
    o = np.arange(m)[:,None]
    w_o = (w+o).ravel()
    r_o = (r+o).ravel()
    idx = np.searchsorted(w_o,r_o).reshape(m,-1)%n
    if choice is not None:
        return np.asarray(choice)[idx]
    else:
        return idx

使用2D bincount进行验证的示例运行-

In [28]: weights = [[0.1, 0.5, 0.4],
    ...:            [0.2, 0.4, 0.4],
    ...:            [0.3, 0.3, 0.4]]
    ...: 
    ...: choice = [1, 2, 3]
    ...: num_items = 20000

In [29]: out = random_choice_prob_vectorized(weights, num_items, choice)

# Use 2D bincount to get per average occurences and verify against weights
In [75]: bincount2D_vectorized(out)/num_items
Out[75]: 
array([[0.     , 0.09715, 0.4988 , 0.40405],
       [0.     , 0.1983 , 0.40235, 0.39935],
       [0.     , 0.30025, 0.29485, 0.4049 ]])

【讨论】:

    【解决方案3】:

    看起来结果数组的每一行都独立于其他行。我不确定现在的表现有多糟糕。如果这真的是一个问题,我会尝试使用 python 的multiprocessing 模块来运行随机数生成,并并行处理多个进程。它应该会有所帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-10-08
      • 2019-11-06
      • 2018-04-18
      • 1970-01-01
      • 2012-01-04
      • 2019-10-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多