【问题标题】:Function speed improvement: Convert ints to list of 32bit ints函数速度提升:将整数转换为 32 位整数列表
【发布时间】:2018-05-05 03:18:03
【问题描述】:

我正在寻找快速替代我的功能的方法。目标是制作一个基于任意长度整数的 32 位整数列表。长度在 (value, bitlength) 的元组中明确给出。这是异步接口的 bit-banging 过程的一部分,每个总线事务需要 4 个 32 位整数。

所有整数都是无符号的,正数或零,长度可以在 0 到 2000 之间变化

我的输入是这些元组的列表, 输出应该是具有隐式 32 位长度的整数,位按顺序排列。其余不适合 32 的位也应返回。

input: [(0,128),(1,12),(0,32)]
output:[0, 0, 0, 0, 0x100000], 0, 12

我已经花了一两天时间对cProfile 进行分析,并尝试了不同的方法,但我似乎有点卡在在一秒钟内需要大约 100k 元组的函数上,这有点慢。理想情况下,我想要 10 倍的加速,但我没有足够的经验知道从哪里开始。这个速度的最终目标是超过每秒 4M 元组。

感谢您的任何帮助或建议。

我能做到的最快的是:

def foo(tuples):
    '''make a list of tuples of (int, length) into a list of 32 bit integers [1,2,3]'''
    length = 0
    remlen = 0
    remint = 0
    i32list = []
    for a, b in tuples:
        n = (remint << (32-remlen)) | a #n = (a << (remlen)) | remint
        length += b
        if length > 32:
            len32 = int(length/32)
            for i in range(len32):
                i32list.append((n >> i*32) & 0xFFFFFFFF)
            remint = n >> (len32*32)
            remlen = length - len32*32
            length = remlen
        elif length == 32:
            appint = n & 0xFFFFFFFF
            remint = 0
            remlen = 0
            length -= 32
            i32list.append(appint)
        else:
            remint = n
            remlen = length
    return i32list, remint, remlen

这具有非常相似的性能:

def tpli_2_32ili(tuples):
    '''make a list of tuples of (int, length) into a list of 32 bit integers [1,2,3]'''
#    binarylist = "".join([np.binary_repr(a, b) for a, b in inp]) # bin(a)[2:].rjust(b, '0')
    binarylist = "".join([bin(a)[2:].rjust(b, '0') for a, b in tuples])
    totallength = len(binarylist)
    tot32 = int(totallength/32)
    i32list = [int(binarylist[i:i+32],2) for i in range(0, tot32*32, 32) ]
    remlen = totallength - tot32*32
    remint = int(binarylist[-remlen:],2)
    return i32list, remint, remlen

【问题讨论】:

  • @martineau:抱歉,已修复。
  • 可能的建议:切换到 pypy 并检查速度(注意:pypy JIT 编译器需要 long 时间来预热。至少在几秒钟的数据上运行它),然后开始挖掘 JIT 跟踪并查看是什么阻碍了您的代码,例如 here 。众所周知,python 中的函数调用会产生大量开销,所以如果可能的话,可以尝试让你的函数一次处理一大堆元组。或者也许看看 Cython。

标签: python


【解决方案1】:

到目前为止,我能想到的最好的结果是 25% 的加速

from functools import reduce

intMask = 0xffffffff

def f(x,y):
    return (x[0] << y[1]) + y[0], x[1] + y[1]

def jens(input):
    n, length = reduce( f , input, (0,0) )
    remainderBits = length % 32
    intBits = length - remainderBits
    remainder = ((n & intMask) << (32 - remainderBits)) >> (32 - remainderBits)
    n >>= remainderBits

    ints = [n & (intMask << i) for i in range(intBits-32, -32, -32)]
    return ints, remainderBits, remainder

print([hex(x) for x in jens([(0,128),(1,12),(0,32)])[0]])

它使用long根据位长对元组值求和,然后从这个数中提取32位值和剩余位。总长度的计算(对输入元组的长度值求和)和大值的计算在单个循环中完成,reduce 使用内部循环。

运行 matineau 的基准线束打印,我看到的最好的数字是:

Fastest to slowest execution speeds using Python 3.6.5
(1,000 executions, best of 3 repetitions)

          jens :  0.004151 secs, rel speed  1.00x,     0.00% slower
 First snippet :  0.005259 secs, rel speed  1.27x,    26.70% slower
Second snippet :  0.008328 secs, rel speed  2.01x,   100.64% slower

如果您使用一些实现位数组的 C 扩展,您可能会获得更好的加速。

【讨论】:

    【解决方案2】:

    这不是更快实施的答案。相反,您的问题中的两个 sn-ps 中的代码放置在可扩展的基准测试框架中,这使得比较不同的方法变得非常容易。

    仅比较这两个测试用例,它表明您的第二种方法没有与第一种方法具有非常相似的性能,基于显示的输出。事实上,它的速度几乎是原来的两倍。

    from collections import namedtuple
    import sys
    from textwrap import dedent
    import timeit
    import traceback
    
    N = 1000  # Number of executions of each "algorithm".
    R = 3  # Number of repetitions of those N executions.
    
    # Common setup for all testcases (executed before any algorithm specific setup).
    COMMON_SETUP = dedent("""
        # Import any resources needed defined in outer benchmarking script.
        #from __main__ import ??? # Not needed at this time
    """)
    
    
    class TestCase(namedtuple('CodeFragments', ['setup', 'test'])):
        """ A test case is composed of separate setup and test code fragments. """
        def __new__(cls, setup, test):
            """ Dedent code fragment in each string argument. """
            return tuple.__new__(cls, (dedent(setup), dedent(test)))
    
    
    testcases = {
        "First snippet": TestCase("""
            def foo(tuples):
                '''make a list of tuples of (int, length) into a list of 32 bit integers [1,2,3]'''
                length = 0
                remlen = 0
                remint = 0
                i32list = []
                for a, b in tuples:
                    n = (remint << (32-remlen)) | a #n = (a << (remlen)) | remint
                    length += b
                    if length > 32:
                        len32 = int(length/32)
                        for i in range(len32):
                            i32list.append((n >> i*32) & 0xFFFFFFFF)
                        remint = n >> (len32*32)
                        remlen = length - len32*32
                        length = remlen
                    elif length == 32:
                        appint = n & 0xFFFFFFFF
                        remint = 0
                        remlen = 0
                        length -= 32
                        i32list.append(appint)
                    else:
                        remint = n
                        remlen = length
    
                return i32list, remint, remlen
            """, """
            foo([(0,128),(1,12),(0,32)])
            """
    
        ),
        "Second snippet": TestCase("""
            def tpli_2_32ili(tuples):
                '''make a list of tuples of (int, length) into a list of 32 bit integers [1,2,3]'''
                binarylist = "".join([bin(a)[2:].rjust(b, '0') for a, b in tuples])
                totallength = len(binarylist)
                tot32 = int(totallength/32)
                i32list = [int(binarylist[i:i+32],2) for i in range(0, tot32*32, 32) ]
                remlen = totallength - tot32*32
                remint = int(binarylist[-remlen:],2)
                return i32list, remint, remlen
            """, """
            tpli_2_32ili([(0,128),(1,12),(0,32)])
            """
        ),
    }
    
    # Collect timing results of executing each testcase multiple times.
    try:
        results = [
            (label,
             min(timeit.repeat(testcases[label].test,
                               setup=COMMON_SETUP + testcases[label].setup,
                               repeat=R, number=N)),
            ) for label in testcases
        ]
    except Exception:
        traceback.print_exc(file=sys.stdout)  # direct output to stdout
        sys.exit(1)
    
    # Display results.
    major, minor, micro = sys.version_info[:3]
    print('Fastest to slowest execution speeds using Python {}.{}.{}\n'
          '({:,d} executions, best of {:d} repetitions)'.format(major, minor, micro, N, R))
    print()
    
    longest = max(len(result[0]) for result in results)  # length of longest label
    ranked = sorted(results, key=lambda t: t[1]) # ascending sort by execution time
    fastest = ranked[0][1]
    for result in ranked:
        print('{:>{width}} : {:9.6f} secs, rel speed {:5,.2f}x, {:8,.2f}% slower '
              ''.format(
                    result[0], result[1], round(result[1]/fastest, 2),
                    round((result[1]/fastest - 1) * 100, 2),
                    width=longest))
    

    输出:

    Fastest to slowest execution speeds using Python 3.6.5
    (1,000 executions, best of 3 repetitions)
    
     First snippet :  0.003024 secs, rel speed  1.00x,     0.00% slower
    Second snippet :  0.005085 secs, rel speed  1.68x,    68.13% slower
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-05-30
      • 2011-10-25
      • 2019-08-17
      • 1970-01-01
      • 2018-08-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多