【问题标题】:Iteration performance迭代性能
【发布时间】:2018-04-13 12:08:47
【问题描述】:

我创建了一个函数来通过实验评估以下问题,取自A Primer for the Mathematics of Financial Engineering

问题:设 X 为您必须掷出一枚公平硬币直到正面朝上的次数。什么是 E[X](期望值)和 var(X)(方差)?

按照教科书的解决方案,以下代码得出正确答案:

from sympy import *

k = symbols('k')

Expected_Value = summation(k/2**k, (k, 1, oo)) # Both solutions work

Variance = summation(k**2/2**k, (k, 1, oo)) - Expected_Value**2

为了验证这个答案,我决定尝试制作一个函数来模拟这个实验。以下代码是我想出的。

def coin_toss(toss, probability=[0.5, 0.5]): 

    """Computes expected value and variance for coin toss experiment"""

    flips = [] # Collects total number of throws until heads appear per experiment.

    for _ in range(toss): # Simulate n flips

        number_flips=[] # Number of flips until heads is tossed  

        while sum(number_flips) == 0: # Continue simulation while Tails are thrown 

            number_flips.append(np.random.choice(2, p=probability)) # Append result to number_flips

        flips.append(len(number_flips)) #Append number of flips until lands heads to flips

    Expected_Value, Variance  = np.mean(flips), np.var(flips)    

    print('E[X]: {}'.format(Expected_Value), 
          '\nvar[X]: {}'.format(Variance)) # Return expected value 

如果我使用以下代码模拟 1e6 实验,运行时间约为 35.9 秒。

   from timeit import Timer

   t1 = Timer("""coin_toss(1000000)""", """from __main__ import coin_toss""")

   print(t1.timeit(1))

为了加深我对 Python 的理解,这是解决此类问题的一种特别有效/Python 式的方法吗?如何利用现有库来提高效率/流程执行?

【问题讨论】:

    标签: python python-3.x math probability coin-flipping


    【解决方案1】:

    为了以一种高效的 Python 方式进行编码,您必须查看 PythonSpeedNumPy。下面可以找到一个使用 numpy 的更快代码的示例。

    在python+numpy中优化的abc是向量化操作,在这种情况下是相当困难的,因为有一个while实际上可能是无限的,硬币可以反面40连续几次。但是,可以在 中完成工作,而不是使用 toss 迭代执行 for。这是问题中的coin_tosscoin_toss_2d 方法之间的主要区别。

    coin_toss_2d

    coin_toss_2d 的主要优点是按块工作,这些块的大小有一些默认值,但可以修改它们(因为它们会影响速度)。因此,它只会迭代while current_toss<toss 多次toss%flips_at_a_time。这是通过 numpy 实现的,它允许生成一个矩阵,其结果重复flips_at_a_time 次掷硬币的实验flips_per_try 次。该矩阵将包含 0(尾部)和 1(头部)。

    # i.e. doing only 5 repetitions with 3 flips_at_a_time
    flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
    # Out 
    [[0 0 0] # still no head, we will have to keep trying
     [0 1 1] # head at the 2nd try (position 1 in python)
     [1 0 0]
     [1 0 1]
     [0 0 1]]
    

    一旦得到这个结果,就会调用argmax。这会找到对应于每行(重复)的最大值(将是 1,一个头)的索引,并且在多次出现的情况下,返回第一个,这正是所需要的,在一系列尾之后的第一个头.

    maxs = flip_events.argmax(axis=1)
    # Out
    [0 1 0 0 2] 
    # The first position is 0, however, flip_events[0,0]!=1, it's not a head!
    

    但是,必须考虑所有行为0的情况。在这种情况下,最大值将为 0,它的第一次出现也将为 0,即第一列(尝试)。因此,我们检查在第一次尝试中找到的所有最大值是否对应于第一次尝试中的正面。

    not_finished = (maxs==0) & (flip_events[:,0]!=1)
    # Out
    [ True False False False False] # first repetition is not finished
    

    如果不是这样,我们会循环重复相同的过程,但只针对在任何尝试中都没有头部的重复。

    n = np.sum(not_finished)
    while n!=0: # while there are sequences without any head
        flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
        maxs2 = flip_events.argmax(axis=1)
        maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
        not_finished2 = (maxs2==0) & (flip_events[:,0]!=1) 
        not_finished[not_finished] = not_finished2
        n = np.sum(not_finished)
    # Out
    # flip_events
    [[1 0 1]] # Now there is a head
    # maxs2
    [0]
    # maxs
    [3 1 0 0 2] # The value of the still unfinished repetition has been updated,
    # taking into account that the first position in flip_events is the 4th,
    # without affecting the rest
    

    然后存储对应于第一个头部出现的索引(我们必须添加 1,因为 python 索引从零开始而不是 1)。有一个try ... except ... 块来处理toss 不是repetitions_at_a_time 的倍数的情况。

    def coin_toss_2d(toss, probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
        # Initialize and preallocate data
        current_toss = 0
        flips = np.empty(toss)
        # loop by chunks
        while current_toss<toss:
            # repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
            flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
            # store first head ocurrence
            maxs = flip_events.argmax(axis=1)
            # Check for all tails sequences, that is, repetitions were we have to keep trying to get a head
            not_finished = (maxs==0) & (flip_events[:,0]!=1)
            n = np.sum(not_finished)
            while n!=0: # while there are sequences without any head
                flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
                maxs2 = flip_events.argmax(axis=1)
                maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
                not_finished2 = (maxs2==0) & (flip_events[:,0]!=1) 
                not_finished[not_finished] = not_finished2
                n = np.sum(not_finished)
            # try except in case toss is not multiple of repetitions_at_a_time, in general, no error is raised, that is why a try is useful
            try:
                flips[current_toss:current_toss+repetitions_at_a_time] = maxs+1
            except ValueError:
                flips[current_toss:] = maxs[:toss-current_toss]+1
            # Update current_toss and move to the next chunk
            current_toss += repetitions_at_a_time
        # Once all values are obtained, average and return them
        Expected_Value, Variance  = np.mean(flips), np.var(flips)    
        return Expected_Value, Variance
    

    coin_toss_map

    这里的代码基本相同,但现在,intrinsec while 是在一个单独的函数中完成的,该函数是从包装函数 coin_toss_map 使用 map 调用的。

    def toss_chunk(args):
        probability,repetitions_at_a_time,flips_per_try = args
        # repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
        flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
        # store first head ocurrence
        maxs = flip_events.argmax(axis=1)
        # Check for all tails sequences
        not_finished = (maxs==0) & (flip_events[:,0]!=1)
        n = np.sum(not_finished)
        while n!=0: # while there are sequences without any head
            flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
            maxs2 = flip_events.argmax(axis=1)
            maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
            not_finished2 = (maxs2==0) & (flip_events[:,0]!=1) 
            not_finished[not_finished] = not_finished2
            n = np.sum(not_finished)
        return maxs+1
    def coin_toss_map(toss,probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
        n_chunks, remainder = divmod(toss,repetitions_at_a_time)
        args = [(probability,repetitions_at_a_time,flips_per_try) for _ in range(n_chunks)]
        if remainder:
            args.append((probability,remainder,flips_per_try))
        flips = np.concatenate(map(toss_chunk,args))
        # Once all values are obtained, average and return them
        Expected_Value, Variance  = np.mean(flips), np.var(flips)    
        return Expected_Value, Variance
    

    性能对比

    在我的电脑中,我得到了以下计算时间:

    In [1]: %timeit coin_toss(10**6)
    # Out 
    # ('E[X]: 2.000287', '\nvar[X]: 1.99791891763')
    # ('E[X]: 2.000459', '\nvar[X]: 2.00692478932')
    # ('E[X]: 1.998118', '\nvar[X]: 1.98881045808')
    # ('E[X]: 1.9987', '\nvar[X]: 1.99508631')
    # 1 loop, best of 3: 46.2 s per loop
    
    In [2]: %timeit coin_toss_2d(10**6,repetitions_at_a_time=5*10**5,flips_per_try=4)
    # Out
    # 1 loop, best of 3: 197 ms per loop
    
    In [3]: %timeit coin_toss_map(10**6,repetitions_at_a_time=4*10**5,flips_per_try=4)
    # Out
    # 1 loop, best of 3: 192 ms per loop
    

    均值和方差的结果是:

    In [4]: [coin_toss_2d(10**6,repetitions_at_a_time=10**5,flips_per_try=10) for _ in range(4)]
    # Out
    # [(1.999848, 1.9990739768960009),
    #  (2.000654, 2.0046035722839997),
    #  (1.999835, 2.0072329727749993),
    #  (1.999277, 2.001566477271)]
    
    In [4]: [coin_toss_map(10**6,repetitions_at_a_time=10**5,flips_per_try=4) for _ in range(4)]
    # Out
    # [(1.999552, 2.0005057992959996),
    #  (2.001733, 2.011159996711001),
    #  (2.002308, 2.012128673136001),
    #  (2.000738, 2.003613455356)]
    

    【讨论】:

    • 这是一个很棒的方法!生成多个随机选择数组的选项对我来说应该更明显!在完成您的代码以完全理解流程后,我得到 NameError: name 'mask2' is not defined,大概是在第一个 while 循环中的 n!=0 时。不确定您是否也遇到此错误?
    • 您是否介意进一步解释一下 maxs[mask] += maxs2+flips_per_trymask[mask] = mask2 在做什么?我似乎无法理解这里发生了什么?
    • 其实我想我明白这两行现在在做什么了。当 mask 返回 True 时,它​​们是否替换了 maxs 的相应索引值?
    • 第二次定义mask应该是mask2。从我的笔记本复制并搞砸后,我可能更新了一些评论。我也刚刚为这个变量想了一个更好的名字:not_finished。之后我将编辑帖子,解释说,mask 是一个布尔数组,包含 false 或 true,具体取决于该重复是否已经有一个头部。
    • maxs 数组的更新是考虑到在当前迭代之前已经进行了一些尝试
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-12-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-10-03
    • 2015-09-21
    • 2011-10-04
    相关资源
    最近更新 更多