【问题标题】:Optimize an algorithm which is executed a lot of times优化执行多次的算法
【发布时间】:2017-11-07 10:32:36
【问题描述】:

我有几个for 循环,最里面的循环会被执行很多次。这个最里面的循环包含一些使用 numpy 的繁重计算,所以所有这些都需要很多时间。所以我正在尝试优化最里面的循环。

最内层循环包含以下逻辑:

我有两个 numpy 数组(在现实生活中要大得多):

left = np.asarray([0.4, 0.2, 0.2, 0.7, 0.6, 0.2, 0.3])
right= np.asarray([0.2, 0.7, 0.3, 0.2, 0.1, 0.9, 0.7])

将这些与阈值进行比较,看看我应该向左还是向右。如果left[x] > 0.55 and right[x] < 0.45 我想向左走。 如果left[x] < 0.55 and right[x] > 0.45 我想向右走。 我已经通过创建两个布尔数组来解决这个问题,一个用于左侧,一个用于右侧,根据:

leftListBool = ((left > 0.55)*1 + (right < 0.45)*1 - 1) > 0
rightListBool = ((right > 0.55)*1 + (left < 0.45)*1 - 1) > 0

上面的例子给了我:

leftListBool = [False False False  True  True False False]
rightListBool = [False  True False False False  True  True]

但是如果我最后一次向左走,我就不能向左走(右边也一样)。因此,我根据以下内容循环这些列表:

wentLeft = False
wentRight = False
a = 0
for idx, v in enumerate(leftListBool):
    if leftListBool[idx] and not wentRight:
        a += DoAThing(idx)
        wentLeft = False
        wentRight = True
    elif rightListBool[idx] and not wentLeft:
        a += DoAnotherThing(idx)
        wentLeft = True
        wentRight = False

DoAThing()DoAnotherThing() 只是从 numpy 数组中获取一个值。

这是我在优化方面所做的(以前更糟)。请注意,我需要以正确的顺序执行 DoAThing()DoAnotherThing(),因为它们取决于之前的值。

我尝试了什么?

我的第一个想法是创建一个leftListboolrightListBool 的统一列表,看起来像(左 = 1 和右 = -1):

unified = [0 1 0 -1 -1 1 1]

但我坚持以比以下更优化的方式做到这一点:

buyListBool.astype(int)-sellListBool.astype(int)

但即使我实现了这一点,我也只需要包含第一个值,例如,如果我有两个 1 彼此跟随,这将导致:

unified = [0 1 0 -1 0 1 0]

在这种情况下,我可以将 for 循环简化为:

for i in unified:
    if i == 1:
        a += DoAThing(a)
    elif i == -1:
        a += DoAnotherThing(a)

但即使是这个 for 循环也可以使用一些我还没有弄清楚的 numpy-magic 进行优化。

完整的可运行代码:

start = time.time()

topLimit = 0.55
bottomLimit = 0.45

for outI in range(200):
    for midI in range(200):
        topLimit = 0.55
        bottomLimit = 0.45
        res = np.random.rand(200,3)
        left = res[:,0]        
        right = res[:,1]
        valList = res[:,2]

        #These two statements can probably be optimized 
        leftListBool = ((left > topLimit)*1 + (right < bottomLimit)*1 - 1) > 0
        rightListBool = ((right > topLimit)*1 + (left < bottomLimit)*1 - 1) > 0

        wentLeft = False
        wentRight = False
        a=0
        #Hopefully this loop can be optimized
        for idx, v in enumerate(leftListBool):
            if leftListBool[idx] and not wentRight:
                a += valList[idx]
                wentLeft = False
                wentRight = True
            elif rightListBool[idx] and not wentLeft:
                a += valList[idx]
                wentLeft = True
                wentRight = False

end = time.time()
print(end - start)

【问题讨论】:

  • 我觉得这个问题适合codereview
  • leftListBool = ((left &gt; 0.55)*1 + (right &lt; 0.45)*1 - 1) &gt; 0 例如,这可能是 leftListBool = (left &gt; 0.55 AND right &lt; 0.45)
  • 如果处理就这么简单,我认为任何微优化都不会产生重大影响。由于您将 DoAThing 和 DoAnotherThing 留在黑匣子中,我们无法判断它们是否代表重要的处理。
  • val 到底是什么?我认为这是一个错字,应该是leftListBool,就像前面的例子一样。但是,如果您更正它(或提供它的生成方式)会很好。
  • @Ev.Kounis 如果这些是 numpy 数组,我真的可以做left &gt; 0.55 AND right &lt; 0.45 吗?

标签: python performance loops numpy optimization


【解决方案1】:

如果您需要循环遍历序列并且关心性能,则不应使用numpy.arrays。当 NumPy 可以执行循环时,NumPy 数组很棒,但是如果你必须自己循环它会很慢(如果你想看看,我最近在另一个答案中详细介绍了为什么数组上的迭代非常慢:@ 987654321@).

您可以简单地使用 tolistzip 来避免迭代 numpy-array 开销:

import time
import numpy as np

start = time.time()

topLimit = 0.55
bottomLimit = 0.45

for outI in range(200):
    for midI in range(200):
        topLimit = 0.55
        bottomLimit = 0.45
        res = np.random.rand(200,2)
        left = res[:,0].tolist()      # tolist!
        right = res[:,1].tolist()     # tolist!

        wentLeft = False
        wentRight = False
        a=0

        for leftitem, rightitem in zip(left, right):
            if leftitem > topLimit and rightitem < bottomLimit and not wentRight:
                wentLeft, wentRight = False, True
            elif rightitem > topLimit and leftitem < bottomLimit and not wentLeft:
                wentLeft, wentRight = True, False

end = time.time()
print(end - start)

这将我的计算机上的运行时间减少了 30%。

您也可以稍后进行tolist 转换(可能更快也可能不会更快):

start = time.time()

topLimit = 0.55
bottomLimit = 0.45

for outI in range(200):
    for midI in range(200):
        topLimit = 0.55
        bottomLimit = 0.45
        res = np.random.rand(200,2)
        left = res[:,0]     
        right = res[:,1]

        # use tolist after the comparisons
        leftListBool = ((left > topLimit) & (right < bottomLimit)).tolist()
        rightListBool = ((right > topLimit) & (left < bottomLimit)).tolist()

        wentLeft = False
        wentRight = False
        a=0
        #Hopefully this loop can be optimized
        for idx in range(len(leftListBool)):  # avoid direct iteration over an array
            if leftListBool[idx] and not wentRight:
                #a += DoAThing(a)
                wentLeft = False
                wentRight = True
            elif rightListBool[idx] and not wentLeft:
                #a += DoAnotherThing(a)
                wentLeft = True
                wentRight = False

end = time.time()
print(end - start)

这与其他方法大致一样快,但是当 leftright 获得比 200 个大得多的元素时,它可能会变得快得多

但这只是基于算法,​​不知道DoAThingDoAnotherThing。您可以以允许矢量化操作的方式构造它们(这可以在不使用lists 的情况下将其加速一个数量级)。不过这要困难得多,我不知道这些函数在做什么。

【讨论】:

  • 完美,感谢您提供有关迭代 numpy-arrays 的信息,并感谢您提供的链接。这正是我正在寻找的信息类型。我已经用有关DoAThingDoAnotherThing 的信息更新了我的问题。他们基本上从数组中获取一个值,根据您的解决方案将其放入 zip 中。如果没有其他结果,我会试一试,然后接受您的回答。
  • @Cleared 所以DoAThingDoAnotherThing 是一样的吗?
  • 注:我认为问一个新问题包括你的valList 会更合适。在收到答案后更改问题中的重要内容并不是特别有用,因为它会使现有答案无效(可以进行诸如拼写错误之类的琐碎编辑)。
  • 感谢您的评论,我会考虑这一点(以及在我未来的问题中)。但是因为我觉得你的回答仍然有效,所以我认为没关系。我只是想指出我无法对DoAThingDoAnotherThing 进行矢量化(在阅读了我的问题后,我意识到我失败了)。
【解决方案2】:

根据更新后的问题,我将介绍一种矢量化代码的方法:

import time

start = time.time()

topLimit = 0.55
bottomLimit = 0.45

for outI in range(200):
    for midI in range(200):
        topLimit = 0.55
        bottomLimit = 0.45
        res = np.random.rand(200,3)
        left = res[:,0]        
        right = res[:,1]
        valList = res[:,2]

        # Arrays containing where to go left and when to go right
        leftListBool = ((left > topLimit) & (right < bottomLimit))
        rightListBool = ((right > topLimit) & (left < bottomLimit))

        # Exclude all points that are neither right or left
        common = leftListBool | rightListBool
        valList = valList[common]
        leftListBool = leftListBool[common]
        rightListBool = rightListBool[common]

        # Remove the values where you would go right or left multiple times in a row
        leftListBool[1:] &= leftListBool[1:] ^ leftListBool[:-1]
        rightListBool[1:] &= rightListBool[1:] ^ rightListBool[:-1]
        valList = valList[leftListBool | rightListBool]

        # Just use np.sum to calculate the sum of the remaining items
        a = np.sum(valList)

end = time.time()
print(end - start)

内部循环完全矢量化,并且该方法(在我的计算机上)比原始代码快 3 倍。如果我需要对某些部分添加更多解释,请告诉我。 ^(异或运算符)只是np.diff 的一种更高效的方式,仅适用于布尔数组。

【讨论】:

  • 感谢您的努力和回答,我真的很感激。我试图使可运行的代码尽可能简单和精简,但我意识到我已经把它精简了。 valList[idx] 只是应该代表一个对无法矢量化的整体性能影响很小的任意操作。我已更新以使其更好地代表我的情况。在每次迭代中,我从列表中获取一个值,位置取决于我最后一次获取的值。我将发布另一个关于如何矢量化我的DoAThing(a) 的问题并在那里提供更多详细信息
  • @Cleared 好的,请提出一个新问题,包括所有相关细节并回滚这个问题。做增量回答 -> 编辑 -> 回答 -> 编辑是没有意义的。您不断因这些更改而使答案无效。
  • 我已将问题回滚到答案适用的状态(因此更改仅包括代码的错误修复)。我将提出一个关于矢量化的新问题。感谢您的反馈
猜你喜欢
  • 1970-01-01
  • 2015-02-27
  • 2023-03-25
  • 1970-01-01
  • 2023-03-11
  • 2016-07-30
  • 2023-02-03
  • 2015-11-03
相关资源
最近更新 更多