【问题标题】:Split a Python List into Chunks with Maximum Memory Size将 Python 列表拆分为具有最大内存大小的块
【发布时间】:2021-06-26 09:03:16
【问题描述】:

给定一个 python listbytes 值:

# actual str values un-important
[
    b'foo',
    b'bar',
    b'baz',
    ...
]

如何将列表分成多个块,每个块的最大内存大小低于某个上限?

例如:如果上限是 7 个字节,那么原始列表将被分解为列表列表

[
    [b'foo', b'bar'], # sublist 0
    [b'baz'], # sublist 1
    ...
]

根据列表内容的累积长度,每个子列表最多为 7 个字节。

注意:每个子列表都应该按照原始列表的顺序最大程度地打包。在上面的示例中,前 2 个 str 值被分组,因为它是 7 字节限制下可能的最大值。

提前感谢您的考虑和回复。

【问题讨论】:

  • 在 Python 中获取对象的大小是一项相当复杂/昂贵的操作,因为您需要递归迭代容器类型,如本答案 stackoverflow.com/a/30316760/548562 中所述。这是一个有趣的问题,为什么要根据大小将列表分成块?
  • 我对你想要达到的目标有点困惑。 dicts 本身的大小将是一致的,因为它们实际上并不包含值,只是对值的引用。这是XY问题吗?在我的系统上,sys.getsizeof 为所有这些返回 240{'a': []}{'b': [1]}{'c': [1, 2] * 999}
  • 单个字典能否超过 1 KB 阈值?
  • @IainShelvington 这些字典最终将被广播到最大批处理记录大小为 5MB 的 AWS Kinesis。
  • @MisterMiyagi 好的,我明白了你的意思,和我在第一条评论中的意思差不多

标签: python python-3.x


【解决方案1】:

可以贪婪地解决序列的最优拆分问题,以使元素满足给定的最大/最小条件同时保持元素的顺序。 因此,您只需对输入序列进行一次迭代并维护一个元素缓冲区。 在 Python 中,这可以使用生成器进行优雅的编码,其优点是不需要创建结果。

您的问题的大部分算法如下:

def split_by_size(items, max_size, get_size=len):
    buffer = []
    buffer_size = 0
    for item in items:
        item_size = get_size(item)
        if buffer_size + item_size <= max_size:
            buffer.append(item)
            buffer_size += item_size
        else:
            yield buffer
            buffer = [item]
            buffer_size = item_size
    if buffer_size > 0:
        yield buffer

最后一个参数将确定给定项目大小的问题委托给指定的可调用对象。 我不会详述这一点,但我会假设一个简单的len() 就可以了。 此外,这假设每个元素单独满足条件,否则也应该处理这种情况。

测试上面的代码:

import random


k = 10
n = 15
max_size = 10

random.seed(0)
items = [b'x' * random.randint(1, 2 * k // 3) for _ in range(n)]
print(items)
# [b'xxxx', b'xxxx', b'x', b'xxx', b'xxxxx', b'xxxx', b'xxxx', b'xxx', b'xxxx', b'xxx', b'xxxxx', b'xx', b'xxxxx', b'xx', b'xxx']

print(list(split_by_size(items, k)))
# [[b'xxxx', b'xxxx', b'x'], [b'xxx', b'xxxxx'], [b'xxxx', b'xxxx'], [b'xxx', b'xxxx', b'xxx'], [b'xxxxx', b'xx'], [b'xxxxx', b'xx', b'xxx']]

另外,如果您愿意将拆分结果存储在list 中,则上述方法的代码可以稍微紧凑一些:

def chunks_by_size(items, max_size, get_size=len):
    result = []
    size = max_size + 1
    for item in items:
        item_size = get_size(item)
        size += item_size
        if size > max_size:
            result.append([])
            size = item_size
        result[-1].append(item)
    return result

但也稍慢(请参阅下面的基准)。


您也可以考虑使用functools.reduce()(与@NizamMohamed answer基本相同),代码会更短,但可读性可能也更低:

def chunks_by_size_reduce(items, size, get_size=len):
    return functools.reduce(
        lambda a, b, size=size:
            a[-1].append(b) or a
            if a and sum(get_size(x) for x in a[-1]) + get_size(b) <= size
            else a.append([b]) or a, items, [])

而且效率肯定较低,因为get_size() 被考虑的每个元素的“候选”内部列表的每个元素都被调用,这使得O(n k!)k 成为每个子序列中元素的平均数量.有关某些时间安排,请参阅下面的基准。


我不会对使用 itertools.accumulate() 的解决方案感到惊讶,但这也一定会很慢。


加快速度的最简单方法是使用CythonNumba。 在这里,这被应用于split_by_size()。 对于他们两个,代码都不会改变。

对我们获得的所有这些进行基准测试(_cy 代表 Cython 编译版本,_nb 代表 Numba 编译版本):

%timeit list(split_by_size(items * 100000, k + 1))
# 10 loops, best of 3: 281 ms per loop
%timeit list(split_by_size_cy(items * 100000, k + 1))
# 10 loops, best of 3: 181 ms per loop
%timeit list(split_by_size_nb(items * 100000, k + 1))
# 100 loops, best of 3: 5.17 ms per loop
%timeit chunks_by_size(items * 100000, k + 1)
# 10 loops, best of 3: 318 ms per loop
%timeit chunks_by_size_reduce(items * 100000, k + 1)
# 1 loop, best of 3: 1.18 s per loop

请注意,虽然 Numba 编译的版本比其他版本快得多,但它也是最脆弱的,因为它需要将 forceobj 标志设置为 True,这可能会导致执行不稳定。

无论如何,如果最终目标是通过一些 I/O 操作发送分组项,我几乎不相信这会成为瓶颈。


请注意,该算法与其他答案几乎相同,我只是发现这里的代码更简洁。

【讨论】:

    【解决方案2】:

    这个解决方案是functools.reduce

    l = [b'abc', b'def', b'ghi', b'jklm', b'nopqrstuv', b'wx', b'yz']
    
    reduce(lambda a, b, size=7: a[-1].append(b) or a if a and sum(len(x) for x in a[-1]) + len(b) <= size else a.append([b]) or a, l, [])
    

    a 是一个空的listb 是原始list 的一个项目。

    if a and sum(len(x) for x in a[-1]) + len(b) &lt;= size
    检查a 是否不为空,并且最后附加的listbytes 的长度和b 的长度之和不超过size

    a[-1].append(b) or a
    b 附加到a 的最后一个附加list 上,如果条件为True,则返回a

    a.append([b]) or a
    b 制作list 并将新的list 附加到a 并返回a

    输出;

    [[b'abc', b'def'], [b'ghi', b'jklm'], [b'nopqrstuv'], [b'wx', b'yz']]
    

    【讨论】:

      【解决方案3】:

      简单、天真的方法是:

      import sys
      import numpy as np
      
      # init input data - as per the comments, data type does matter, 
      # for memory calculation, and for the sake of example -
      # string is probably the easiest case:
      
      lts=list("abcdefghijklmnopqrstuvwxyz")
      
      data=[{letter: "".join(np.random.choice(lts, np.random.randint(100, 700)))} for letter in lts]
      
      # parameters setup:
      
      threshold=1024
      buffer=[]
      buffer_len=0
      res_data=[]
      
      for el in data:
          len_=sys.getsizeof(list(el.values())[0]) # I assumed it's one key, one value per dictionary (looks like this from your question) 
          if(buffer_len+len_>threshold):
              res_data.append(buffer)
              buffer=[el]
              buffer_len=len_
          else:
              buffer.append(el)
              buffer_len+=len_
      
      if(buffer_len>0):
          res_data.append(buffer)
      
      print(res_data)
      

      【讨论】:

        【解决方案4】:

        保持简短和甜蜜:

        l = [b'foo', b'bar', b'baz']
        
        thresh = 7
        out = []
        cur_size = 0
        for x in l:
            if len(x) > thresh:
                raise ValueError("str too big")
            if cur_size + len(x) > thresh:
                cur_size = 0
            if cur_size == 0:
                out.append([])
            out[-1].append(x)
            cur_size += len(x)
        
        print(out)
        

        这将输出:

        [[b'foo', b'bar'], [b'baz']]
        

        如果我理解正确的话,这应该是你想要的。它非常简单;它所做的只是追加列表中的字符串,并检查它要追加到的当前列表中所有内容的组合大小——如果大小加上下一项大于阈值,它会重新启动。

        【讨论】:

          【解决方案5】:
          from sys import getsizeof
          import math
          def chunkify_list(L, max_size_kb):
              chunk_size_elements = int(math.ceil(len(L)/int(math.ceil(getsizeof(L)/(1024*max_size_kb)))))
              return [L[x: x+chunk_size_elements] for x in range(0, len(L), chunk_size_elements)]
          

          我编写了这段代码,它对我有用。 它需要访问数学

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2020-01-14
            • 2018-10-22
            • 1970-01-01
            • 1970-01-01
            • 2023-01-07
            • 2012-07-12
            • 2018-12-05
            • 1970-01-01
            相关资源
            最近更新 更多