好吧,如果您无法将整个文件读入内存,您可以通过遍历字节来完成此操作。我使用deque 作为辅助数据结构,利用maxlen 参数扫描每对连续的字节。为了让我使用 for 循环而不是容易出错的 while 循环,我使用 two-argument iter 逐字节遍历文件 a。例如iter(iterable, sentinal) 首先让我们构建一个测试用例:
>>> import io, functools
>>> import random
>>> some_bytes = bytearray([random.randint(0, 255) for _ in range(12)] + [171, 195] + [88, 42, 88, 42, 88, 42] + [171, 196]+[200, 211, 141])
>>> some_bytes
bytearray(b'\x80\xc4\x8b\x86i\x88\xba\x8a\x8b\x07\x9en\xab\xc3X*X*X*\xab\xc4\xc8\xd3\x8d')
>>>
现在,一些预备:
>>> from collections import deque
>>> start = deque([b'\xab', b'\xc3'])
>>> stop = deque([b'\xab', b'\xc4'])
>>> current = deque(maxlen=2)
>>> target = []
>>> inside = False
假设我们正在读取文件:
>>> f = io.BytesIO(some_bytes)
现在,创建方便的逐字节可迭代对象:
>>> read_byte = functools.partial(f.read, 1)
现在我们可以更轻松地循环:
>>> for b in iter(read_byte, b''):
... current.append(b)
... if not inside and current == start:
... inside = True
... continue
... if inside and current == stop:
... break
... if inside:
... target.append(b)
...
>>> target
[b'X', b'*', b'X', b'*', b'X', b'*', b'\xab']
>>>
您会注意到这会将“end”的第一个值保留在其中。不过,清理起来很简单。这是一个更加充实的示例,其中分隔符之间有几个“运行”字节:
>>> some_bytes = some_bytes * 3
>>> start = deque([b'\xab', b'\xc3'])
>>> stop = deque([b'\xab', b'\xc4'])
>>> current = deque(maxlen=2)
>>> targets = []
>>> target = []
>>> inside = False
>>> f = io.BytesIO(some_bytes)
>>> read_byte = functools.partial(f.read, 1)
>>> for b in iter(read_byte, b''):
... current.append(b)
... if not inside and current == start:
... inside = True
... continue
... if inside and current == stop:
... inside = False
... target.pop()
... targets.append(target)
... target = []
... if inside:
... target.append(b)
...
b'\xab'
b'\xab'
b'\xab'
>>> targets
[[b'X', b'*', b'X', b'*', b'X', b'*'], [b'X', b'*', b'X', b'*', b'X', b'*'], [b'X', b'*', b'X', b'*', b'X', b'*']]
>>>
这种方法会比将文件读入内存并使用re 慢,但它会节省内存。可能有一些我没有想到的边缘情况需要处理,但我认为扩展上述方法应该很简单。此外,如果有一个“开始”字节序列而没有相应的“停止”,target 列表将继续增长,直到文件耗尽。
最后,也许最好的方法是以可管理的块读取文件,并使用以下逻辑处理这些块。这结合了空间和时间效率。在伪伪代码中:
chunksize = 1024
start = deque([b'\xab', b'\xc3'])
stop = deque([b'\xab', b'\xc4'])
current = deque(maxlen=2)
targets = []
target = []
inside = False
read_chunk = functools.partial(f.read, chunksize)
for bytes_chunk in iter(read_chunk, b''):
for b in bytes_chunk:
< same logic as above >