【问题标题】:Python: Creating a streaming gzip'd file-like?Python:创建类似 gzip 的流式文件?
【发布时间】:2011-01-12 15:31:46
【问题描述】:

我正在尝试找出使用 Python 的 zlib 压缩流的最佳方法。

我有一个类似文件的输入流(input,下面)和一个接受类似文件的输出函数(output_function,下面):

with open("file") as input:
    output_function(input)

我想在将 input 块发送到 output_function 之前对其进行 gzip 压缩:

with open("file") as input:
    output_function(gzip_stream(input))

看起来gzip 模块假定输入或输出将是一个 gzip'd 磁盘上的文件......所以我假设 zlib 模块是我想要的。

但是,它本身并没有提供一种简单的方法来创建类似流文件的...而且它支持的流压缩是通过手动将数据添加到压缩缓冲区,然后刷新该缓冲区来实现的。

当然,我可以为 zlib.Compress.compresszlib.Compress.flush 编写一个包装器(Compresszlib.compressobj() 返回),但我会担心缓冲区大小错误或类似情况。

那么,使用 Python 创建流式 gzip 压缩文件的最简单方法是什么?

编辑:澄清一下,输入流和压缩输出流都太大而无法放入内存,所以像output_function(StringIO(zlib.compress(input.read()))) 这样的东西并不能真正解决问题。

【问题讨论】:

  • 我在 effbot:effbot.org/librarybook/zlib.htm 上找到了相反的实现——类似文件的解压缩 gzip 流——但我正在寻找相反的东西(尽管我想如果我需要自己写,可能会有所帮助)

标签: python gzip zlib


【解决方案1】:

由可重用组件组成的更简洁、更通用的版本:

gzipped_iter = igizip(io_iter(input_file_obj))
gzipped_file_obj = iter_io(prefetch(gzipped_iter))

以上函数来自my gist

  • iter_ioio_iter 提供与 Iterable[AnyStr] SupportsRead[AnyStr] 的透明转换
  • igzip 进行流式 gzip 压缩
  • (可选)prefetch 通过线程同时从底层可迭代对象中拉取数据,正常让步给消费者,用于并发读/写
def as_bytes(s: str | bytes):
    if type(s) not in [str, bytes]:
        raise TypeError
    return s.encode() if isinstance(s, str) else s


def iter_io(iterable: Iterable[AnyStr], buffer_size: int = io.DEFAULT_BUFFER_SIZE):
    """
    Returns a buffered file obj that reads bytes from an iterable of str/bytes.

    Example:

    iter_io(['abc', 'def', 'g']).read() == b'abcdefg'
    iter_io([b'abcd', b'efg']).read(5) == b'abcde'
    """
    class IterIO(io.RawIOBase):
        def __init__(self, iterable: Iterable[AnyStr]):
            self._leftover = b''
            self._iterable = (as_bytes(s) for s in iterable if s)

        def readable(self):
            return True

        def readinto(self, buf):
            try:
                chunk = self._leftover or next(self._iterable)
            except StopIteration:
                return 0    # indicate EOF

            output, self._leftover = chunk[:len(buf)], chunk[len(buf):]
            buf[:len(output)] = output
            return len(output)

    return io.BufferedReader(IterIO(iterable), buffer_size=buffer_size)


def io_iter(fo: SupportsRead[AnyStr], size: int = io.DEFAULT_BUFFER_SIZE):
    """
    Returns an iterator that reads from a file obj in sized chunks.

    Example:

    list(io_iter(io.StringIO('abcdefg'), 3)) == ['abc', 'def', 'g']
    list(io_iter(io.BytesIO(b'abcdefg'), 4)) == [b'abcd', b'efg']

    Usage notes/TODO:
     * file obj isn't closed, fix /w keep_open=False and an internal contextmanager
    """
    return iter(lambda: fo.read(size), fo.read(0))


def igzip(chunks: Iterable[AnyStr], level=zlib.Z_DEFAULT_COMPRESSION):
    """
    Streaming gzip: lazily compresses an iterable of bytes or str (utf8)

    Example:

    gzipped_bytes_iter = igzip(['hello ', 'world!'])
    gzip.decompress(b''.join(gzipped_bytes_iter)).encode() == 'hello world!'
    """
    def gen():
        gzip_format = 0b10000
        c = zlib.compressobj(level=level, wbits=zlib.MAX_WBITS + gzip_format)

        yield from (c.compress(as_bytes(chunk)) for chunk in chunks)
        yield c.flush()

    return filter(None, gen())


def prefetch(iterable: Iterable[Any], n: int = 1) -> Iterator[Any]:
    """
    Prefetch an iterable via thread, yielding original contents as normal.

    Example:

    def slow_produce(*args):
        for x in args:
            time.sleep(1)
            yield x

    def slow_consume(iterable):
        for _ in iterable:
            time.sleep(1)

    slow_consume(prefetch(slow_produce('a', 'b')))  # takes 3 sec, not 4

    # Prefetch
    # produce: | 'a' | 'b' |
    # consume:       | 'a' | 'b' |
    # seconds: 0 --- 1 --- 2 --- 3

    # No prefetch
    # produce: | 'a' |     | 'b' |
    # consume:       | 'a' |     | 'b' |
    # seconds: 0 --- 1 --- 2 --- 3 --- 4

    Usage notes/TODO:
     * mem leak: Thread is GC'd only after iterable is fully consumed, fix /w __del__
    """
    queue = Queue(n)
    finished = object()

    def produce():
        for x in iterable:
            queue.put(x)
        queue.put(finished)

    t = Thread(target=produce, daemon=True)
    t.start()

    while True:
        item = queue.get()
        if item is finished:
            break
        else:
            yield item

【讨论】:

    【解决方案2】:

    这有效(至少在 python 3 中):

    with s3.open(path, 'wb') as f:
        gz = gzip.GzipFile(filename, 'wb', 9, f)
        gz.write(b'hello')
        gz.flush()
        gz.close()
    

    在这里,它使用 gzip 压缩写入 s3fs 的文件对象。 神奇的是f参数,也就是GzipFile的fileobj。您必须为 gzip 的标头提供文件名。

    【讨论】:

      【解决方案3】:

      gzip 模块支持压缩为类文件对象,将fileobj 参数传递给GzipFile,以及文件名。你传入的文件名不需要存在,但gzip头有一个文件名字段需要填写。

      更新

      这个答案不起作用。示例:

      # tmp/try-gzip.py 
      import sys
      import gzip
      
      fd=gzip.GzipFile(fileobj=sys.stdin)
      sys.stdout.write(fd.read())
      

      输出:

      ===> cat .bash_history  | python tmp/try-gzip.py  > tmp/history.gzip
      Traceback (most recent call last):
        File "tmp/try-gzip.py", line 7, in <module>
          sys.stdout.write(fd.read())
        File "/usr/lib/python2.7/gzip.py", line 254, in read
          self._read(readsize)
        File "/usr/lib/python2.7/gzip.py", line 288, in _read
          pos = self.fileobj.tell()   # Save current position
      IOError: [Errno 29] Illegal seek
      

      【讨论】:

      • 嗯……我没有注意到……但我不确定它是否会起作用:fileobj 必须是 gzip 的输入流,或者是 gzip 的输出流数据将被写入。所以,总比没有好,但仍然不是我想要的。
      • 请解释为什么这不能解决您的问题。我使用fd=gzip.GzipFile(fileobj=fd),它的工作原理应该是这样。
      • @guettli 作者期望fd 对象没有seek 方法。
      • @AndreyCizov 是的,你是对的。我检查了它并编辑了答案。我希望 user249290 对此没有异议。
      【解决方案4】:

      这是基于 Ricardo Cárdenes 非常有用的答案的更简洁、非自引用的版本。

      from gzip import GzipFile
      from collections import deque
      
      
      CHUNK = 16 * 1024
      
      
      class Buffer (object):
          def __init__ (self):
              self.__buf = deque()
              self.__size = 0
          def __len__ (self):
              return self.__size
          def write (self, data):
              self.__buf.append(data)
              self.__size += len(data)
          def read (self, size=-1):
              if size < 0: size = self.__size
              ret_list = []
              while size > 0 and len(self.__buf):
                  s = self.__buf.popleft()
                  size -= len(s)
                  ret_list.append(s)
              if size < 0:
                  ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:]
                  self.__buf.appendleft(remainder)
              ret = ''.join(ret_list)
              self.__size -= len(ret)
              return ret
          def flush (self):
              pass
          def close (self):
              pass
      
      
      class GzipCompressReadStream (object):
          def __init__ (self, fileobj):
              self.__input = fileobj
              self.__buf = Buffer()
              self.__gzip = GzipFile(None, mode='wb', fileobj=self.__buf)
          def read (self, size=-1):
              while size < 0 or len(self.__buf) < size:
                  s = self.__input.read(CHUNK)
                  if not s:
                      self.__gzip.close()
                      break
                  self.__gzip.write(s)
              return self.__buf.read(size)
      

      优点:

      • 避免重复的字符串连接,这会导致整个字符串被重复复制。
      • 从输入流中读取固定大小的 CHUNK,而不是一次读取整行(可以任意长)。
      • 避免循环引用。
      • 避免 GzipCompressStream() 的误导性公共“写入”方法,该方法实际上只在内部使用。
      • 利用名称修饰来处理内部成员变量。

      【讨论】:

      • 对于python 3,请改用ret = b''.join(ret_list),如果您的输入流是字符串,那么您必须进行编码,例如self.__gzip.write(s.encode('utf-8')
      【解决方案5】:

      它非常笨拙(自我引用等;只需花几分钟时间编写它,没有什么真正优雅的),但如果您仍然对直接使用 gzip 而不是 zlib 感兴趣,它可以满足您的需求。

      基本上,GzipWrap 是一个(非常有限的)类文件对象,它从给定的可迭代对象(例如,类文件对象、字符串列表、任何生成器...)中生成压缩文件。

      当然,它会生成二进制文件,所以实现“readline”是没有意义的。

      您应该能够扩展它以涵盖其他情况或用作可迭代对象本身。

      from gzip import GzipFile
      
      class GzipWrap(object):
          # input is a filelike object that feeds the input
          def __init__(self, input, filename = None):
              self.input = input
              self.buffer = ''
              self.zipper = GzipFile(filename, mode = 'wb', fileobj = self)
      
          def read(self, size=-1):
              if (size < 0) or len(self.buffer) < size:
                  for s in self.input:
                      self.zipper.write(s)
                      if size > 0 and len(self.buffer) >= size:
                          self.zipper.flush()
                          break
                  else:
                      self.zipper.close()
                  if size < 0:
                      ret = self.buffer
                      self.buffer = ''
              else:
                  ret, self.buffer = self.buffer[:size], self.buffer[size:]
              return ret
      
          def flush(self):
              pass
      
          def write(self, data):
              self.buffer += data
      
          def close(self):
              self.input.close()
      

      【讨论】:

      • (好吧,我明白你的观点,将 'self' 传递给 GzipFile 并不是特别优雅……但我仍然认为这是一个巧妙的技巧)。
      • 我已经更正了代码中的一个小错误。当读取大小
      • 为什么标准库没有提供这样的东西?不流式传输的 gzip 实用程序有什么用?通常会出现整个文件无法放入内存的情况(毕竟它被 gzip 压缩是有原因的)。
      • 我猜他们将它包含在内部使用。还没有检查它是否在去年发生了变化,想...
      • 此答案的当前版本一次将一行读入内存。如果您正在压缩的文件的换行符很少,这可能是个问题。
      【解决方案6】:

      将 cStringIO(或 StringIO)模块与 zlib 结合使用:

      >>> import zlib
      >>> from cStringIO import StringIO
      >>> s.write(zlib.compress("I'm a lumberjack"))
      >>> s.seek(0)
      >>> zlib.decompress(s.read())
      "I'm a lumberjack"
      

      【讨论】:

      • 这个问题是,整个输入流必须被加载到内存中(当它被传递到zlib.compress时),然后必须再次加载到内存中当它从zlib.decompress返回时。
      • 如果你使用 StringIO,它永远不会离开内存。您在问题中说您想要一个“类似文件的对象”,这是与文件对象具有相似方法的对象的常见 Python 术语。它没有说明它是否存在于磁盘上。但后来你也建议你不想要一个 gz 文件。您能否更清楚地了解您真正在寻找什么?
      • 呃,对不起 - 是的,那是我的错。在我看来,“类文件对象”意味着“打算分块处理的东西”,但我想这是一个错误的假设。我已经更新了问题。
      • 你看过zlib.compressobj()zlib.decompressobj()吗?非常适合分块。
      • 是的,我有。正如我所提到的(尽管不是很清楚),它们可以工作,但它们的界面不是很标准,这可能取决于我是否正确设置缓冲区大小等内容。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-08-19
      • 1970-01-01
      • 2016-10-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多