可以贪婪地解决序列的最优拆分问题,以使元素满足给定的最大/最小条件同时保持元素的顺序。
因此,您只需对输入序列进行一次迭代并维护一个元素缓冲区。
在 Python 中,这可以使用生成器进行优雅的编码,其优点是不需要创建结果。
您的问题的大部分算法如下:
def split_by_size(items, max_size, get_size=len):
buffer = []
buffer_size = 0
for item in items:
item_size = get_size(item)
if buffer_size + item_size <= max_size:
buffer.append(item)
buffer_size += item_size
else:
yield buffer
buffer = [item]
buffer_size = item_size
if buffer_size > 0:
yield buffer
最后一个参数将确定给定项目大小的问题委托给指定的可调用对象。
我不会详述这一点,但我会假设一个简单的len() 就可以了。
此外,这假设每个元素单独满足条件,否则也应该处理这种情况。
测试上面的代码:
import random
k = 10
n = 15
max_size = 10
random.seed(0)
items = [b'x' * random.randint(1, 2 * k // 3) for _ in range(n)]
print(items)
# [b'xxxx', b'xxxx', b'x', b'xxx', b'xxxxx', b'xxxx', b'xxxx', b'xxx', b'xxxx', b'xxx', b'xxxxx', b'xx', b'xxxxx', b'xx', b'xxx']
print(list(split_by_size(items, k)))
# [[b'xxxx', b'xxxx', b'x'], [b'xxx', b'xxxxx'], [b'xxxx', b'xxxx'], [b'xxx', b'xxxx', b'xxx'], [b'xxxxx', b'xx'], [b'xxxxx', b'xx', b'xxx']]
另外,如果您愿意将拆分结果存储在list 中,则上述方法的代码可以稍微紧凑一些:
def chunks_by_size(items, max_size, get_size=len):
result = []
size = max_size + 1
for item in items:
item_size = get_size(item)
size += item_size
if size > max_size:
result.append([])
size = item_size
result[-1].append(item)
return result
但也稍慢(请参阅下面的基准)。
您也可以考虑使用functools.reduce()(与@NizamMohamed answer基本相同),代码会更短,但可读性可能也更低:
def chunks_by_size_reduce(items, size, get_size=len):
return functools.reduce(
lambda a, b, size=size:
a[-1].append(b) or a
if a and sum(get_size(x) for x in a[-1]) + get_size(b) <= size
else a.append([b]) or a, items, [])
而且效率肯定较低,因为get_size() 被考虑的每个元素的“候选”内部列表的每个元素都被调用,这使得O(n k!)、k 成为每个子序列中元素的平均数量.有关某些时间安排,请参阅下面的基准。
我不会对使用 itertools.accumulate() 的解决方案感到惊讶,但这也一定会很慢。
加快速度的最简单方法是使用Cython 或Numba。
在这里,这被应用于split_by_size()。
对于他们两个,代码都不会改变。
对我们获得的所有这些进行基准测试(_cy 代表 Cython 编译版本,_nb 代表 Numba 编译版本):
%timeit list(split_by_size(items * 100000, k + 1))
# 10 loops, best of 3: 281 ms per loop
%timeit list(split_by_size_cy(items * 100000, k + 1))
# 10 loops, best of 3: 181 ms per loop
%timeit list(split_by_size_nb(items * 100000, k + 1))
# 100 loops, best of 3: 5.17 ms per loop
%timeit chunks_by_size(items * 100000, k + 1)
# 10 loops, best of 3: 318 ms per loop
%timeit chunks_by_size_reduce(items * 100000, k + 1)
# 1 loop, best of 3: 1.18 s per loop
请注意,虽然 Numba 编译的版本比其他版本快得多,但它也是最脆弱的,因为它需要将 forceobj 标志设置为 True,这可能会导致执行不稳定。
无论如何,如果最终目标是通过一些 I/O 操作发送分组项,我几乎不相信这会成为瓶颈。
请注意,该算法与其他答案几乎相同,我只是发现这里的代码更简洁。