【问题标题】:Efficient file buffering & scanning methods for large files in pythonpython中大文件的高效文件缓冲和扫描方法
【发布时间】:2011-01-26 03:55:20
【问题描述】:

我遇到的问题的描述有点复杂,我宁愿提供更完整的信息。对于没有耐心的人,这是我可以总结的最简短的方式:

什么是最快的(最少的执行 time) 将文本文件拆分为的方式 大小为 N 的所有(重叠)子字符串(绑定 N,例如 36) 同时抛出换行符。

我正在编写一个模块来解析基于 FASTA ascii 的基因组格式的文件。这些文件包含所谓的“hg18”人类参考基因组,如果您愿意,可以从 UCSC genome browser 下载(加油!)。

您会注意到,基因组文件由 chr[1..22].fa 和 chr[XY].fa 以及一组本模块中未使用的其他小文件组成。

已经存在一些用于解析 FASTA 文件的模块,例如 BioPython 的 SeqIO。 (对不起,我会发布一个链接,但我还没有这样做的要点。)不幸的是,我能够找到的每个模块都没有执行我想要执行的特定操作。

我的模块需要将基因组数据(例如,'CAGTACGTCAGACTATACGGAGCTA' 可能是一条线)拆分为每个重叠的 N 长度子字符串。让我举一个例子,使用一个非常小的文件(实际的染色体文件长度在 355 到 2000 万个字符之间)并且 N=8

>>>导入 cStringIO >>>example_file = cStringIO.StringIO("""\ >标题 CAGTcag TFgcACF """) >>>在解析中读取(example_file): ...打印阅读 ... CAGTCAGTF AGTCAGTFG GTCAGTFGC TCAGTFGCA CAGTFGCAC AGTFGCACF

我发现的函数在我能想到的方法中具有绝对最佳的性能是这样的:


def parse(file):
  size = 8 # of course in my code this is a function argument
  file.readline() # skip past the header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

这可行,但不幸的是,以这种方式解析人类基因组仍需要大约 1.5 小时(见下面的注释)。也许这是我将使用这种方法看到的最好的方法(可能需要进行完整的代码重构,但我想避免它,因为这种方法在代码的其他领域有一些非常具体的优势),但我我想我会把它交给社区。​​p>

谢谢!

  • 请注意,这一次包含大量额外计算,例如计算反向链读取以及对大小约为 5G 的哈希进行哈希表查找。

回答后的结论: 事实证明,使用 fileobj.read() 然后操作生成的字符串(string.replace() 等)与程序的其余部分,所以我使用了这种方法。谢谢大家!

【问题讨论】:

    标签: python performance io bioinformatics fasta


    【解决方案1】:

    你能映射文件并用滑动窗口开始浏览它吗?我写了一个运行非常小的愚蠢的小程序:

    USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
    sarnold  20919  0.0  0.0  33036  4960 pts/2    R+   22:23   0:00 /usr/bin/python ./sliding_window.py
    

    处理一个 636229 字节的 fasta 文件(通过 http://biostar.stackexchange.com/questions/1759 找到)需要 0.383 秒。

    #!/usr/bin/python
    
    import mmap
    import os
    
      def parse(string, size):
        stride = 8
        start = string.find("\n")
        while start < size - stride:
            print string[start:start+stride]
            start += 1
    
    fasta = open("small.fasta", 'r')
    fasta_size = os.stat("small.fasta").st_size
    fasta_map = mmap.mmap(fasta.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
    parse(fasta_map, fasta_size)
    

    【讨论】:

    • 由于整个文件都被处理了,你最终不会耗尽内存吗?
    • @dietbuddha,这是一种风险,在 32 位机器上,您真正可以处理的最大文件大约是 2 GB,在 64 位机器上,您当然可以更大。希望操作系统的页面替换算法可以处理通过映射段的顺序读取,以帮助保持驻留集的大小。
    • @Fred Nurk,我忘记了我有一个更大的 fasta 文件:2m23.355 秒内有 242838416 字节。每秒输出 190 万个字符串。
    • 更接近真实测试。当然,我们无法比较不同机器上的挂钟时间,但这个数据集似乎确实处于足够大的阈值,需要特别考虑。实际上,我认为这是错误的看法,我已经删除了我之前的评论。
    • @Fred Nurk 30s for his 数据集听起来令人印象深刻。 :) 我迫不及待地想知道什么是最好的。 :)
    【解决方案2】:

    一些经典的 IO 绑定更改。

    • 使用os.read 等较低级别的读取操作并读入较大的固定缓冲区。
    • 使用线程/多处理,其中一个读取和缓冲,而其他进程。
    • 如果您有多个处理器/机器,请使用 multiprocessing/mq 在 CPU 之间分配处理,例如 map-reduce。

    使用较低级别的读取操作不会有太多的重写。其他的将是相当大的重写。

    【讨论】:

    • Os.read 似乎没有显着差异(结果在我的回答中)。
    • 感谢您的回答。我正在考虑为 map-reduce 方法重构问题,但我必须承认我的犹豫源于没有看到算法的实际运行,因此对此有点天真。你能给我推荐一些特别的文学作品吗?
    • 我也想添加这个 - 实际上我已经在高度并行化,这显示了巨大的 IO 好处。问了这个问题后,我开始怀疑我的问题其实根本就不是 IO,而是我在那个 IO 上做的转换。
    • 我将其标记为已接受,因为经过几周的测试(包括测试我正在使用此代码的程序的其他部分),我们采用的方法只是读取整个染色体文件obj.read()。 os.read 确实比这更好,但只有很小的幅度。其他人对这个问题提供了相同的答案,但我觉得这个答案以最简洁的方式得到了它。谢谢!
    【解决方案3】:

    我怀疑问题在于您以字符串格式存储了 太多 数据,这对于您的用例来说真的很浪费,以至于您的实际内存不足并颠簸交换. 128 GB 应该足以避免这种情况... :)

    由于您已在 cmets 中指出无论如何您都需要存储其他信息,因此我会选择引用父字符串的单独类。我使用 hg18 的 chromFa.zip 中的 chr21.fa 进行了简短的测试;该文件大约 48MB,不到 1M 行。我这里只有 1GB 的内存,所以我只是在之后丢弃了这些对象。因此,该测试不会显示碎片、缓存或相关问题,但我认为它应该是衡量解析吞吐量的一个很好的起点:

    import mmap
    import os
    import time
    import sys
    
    class Subseq(object):
      __slots__ = ("parent", "offset", "length")
    
      def __init__(self, parent, offset, length):
        self.parent = parent
        self.offset = offset
        self.length = length
    
      # these are discussed in comments:
      def __str__(self):
        return self.parent[self.offset:self.offset + self.length]
    
      def __hash__(self):
        return hash(str(self))
    
      def __getitem__(self, index):
        # doesn't currently handle slicing
        assert 0 <= index < self.length
        return self.parent[self.offset + index]
    
      # other methods
    
    def parse(file, size=8):
      file.readline()  # skip header
      whole = "".join(line.rstrip().upper() for line in file)
      for offset in xrange(0, len(whole) - size + 1):
        yield Subseq(whole, offset, size)
    
    class Seq(object):
      __slots__ = ("value", "offset")
      def __init__(self, value, offset):
        self.value = value
        self.offset = offset
    
    def parse_sep_str(file, size=8):
      file.readline()  # skip header
      whole = "".join(line.rstrip().upper() for line in file)
      for offset in xrange(0, len(whole) - size + 1):
        yield Seq(whole[offset:offset + size], offset)
    
    def parse_plain_str(file, size=8):
      file.readline()  # skip header
      whole = "".join(line.rstrip().upper() for line in file)
      for offset in xrange(0, len(whole) - size + 1):
        yield whole[offset:offset+size]
    
    def parse_tuple(file, size=8):
      file.readline()  # skip header
      whole = "".join(line.rstrip().upper() for line in file)
      for offset in xrange(0, len(whole) - size + 1):
        yield (whole, offset, size)
    
    def parse_orig(file, size=8):
      file.readline() # skip header
      buffer = ''
      for line in file:
        buffer += line.rstrip().upper()
        while len(buffer) >= size:
          yield buffer[:size]
          buffer = buffer[1:]
    
    def parse_os_read(file, size=8):
      file.readline()  # skip header
      file_size = os.fstat(file.fileno()).st_size
      whole = os.read(file.fileno(), file_size).replace("\n", "").upper()
      for offset in xrange(0, len(whole) - size + 1):
        yield whole[offset:offset+size]
    
    def parse_mmap(file, size=8):
      file.readline()  # skip past the header
      buffer = ""
      for line in file:
        buffer += line
        if len(buffer) >= size:
          for start in xrange(0, len(buffer) - size + 1):
            yield buffer[start:start + size].upper()
          buffer = buffer[-(len(buffer) - size + 1):]
      for start in xrange(0, len(buffer) - size + 1):
        yield buffer[start:start + size]
    
    def length(x):
      return sum(1 for _ in x)
    
    def duration(secs):
      return "%dm %ds" % divmod(secs, 60)
    
    
    def main(argv):
      tests = [parse, parse_sep_str, parse_tuple, parse_plain_str, parse_orig, parse_os_read]
      n = 0
      for fn in tests:
        n += 1
        with open(argv[1]) as f:
          start = time.time()
          length(fn(f))
          end = time.time()
          print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))
    
      fn = parse_mmap
      n += 1
      with open(argv[1]) as f:
        f = mmap.mmap(f.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
        start = time.time()
        length(fn(f))
        end = time.time()
      print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))
    
    
    if __name__ == "__main__":
      sys.exit(main(sys.argv))
    

    1  parse                 1m 42s
    2  parse_sep_str         1m 42s
    3  parse_tuple           0m 29s
    4  parse_plain_str       0m 36s
    5  parse_orig            0m 45s
    6  parse_os_read         0m 34s
    7  parse_mmap            0m 37s
    

    前四个是我的代码,而 orig 是你的,最后两个来自这里的其他答案。

    用户定义的对象比元组或纯字符串的创建和收集成本更高!这应该不足为奇,但我没有意识到它会产生如此大的差异(比较 #1 和 #3,它们实际上只在用户定义的类与元组中有所不同)。你说你想用字符串存储额外的信息,比如偏移量(如在 parse 和 parse_sep_str 情况下),所以你可以考虑在 C 扩展模块中实现该类型。不想直接写 C 的可以看看 Cython 和相关的。

    案例#1 和#2 相同是意料之中的:通过指向父字符串,我试图节省内存而不是处理时间,但这个测试没有衡量这一点。

    【讨论】:

    • 谢谢你的回答,但我有两点不同意。首先是这种方法使用的内存非常少。假设返回结果不会保存很长时间(确实如此,但请参见第二点),没有任何对象会在不被收集的情况下留下来。它接近于固定内存。第 2 点是我正在仔细监控 RAM 的使用情况,而且我已经完全掌握了这些信息。 (我正在运行它的机器是一个拥有 128 GB RAM 的野兽,该进程仅使用 ~5G,而不是来自这段代码。)
    • @Fred: ...我对每次通过这个循环实例化一个新对象(Subseq)的目的感到困惑。为什么不只是yield whole[offset: offset+size]
    • @eblume:够公平;我确实认为您保留了结果,这就是我要节省内存而不是函数本身的地方。您仍然可能需要考虑这一点,因为这使用更少的内存(因此可能更适合缓存)并且会导致更少的堆碎片等。基因组很大,但即便如此,考虑到您机器的规格,我发现 1.5 小时长;根据我在 parse 函数中的内容进行调整可能会有所帮助。
    • @Fred:现在我看到了你的代码示例,我想我看到了更多你在做什么。但在某些时候,我需要将结果具体化为字符串,因为我将这些字符串用作哈希表查找。此外,我在主帖中添加了解释 1.5 小时时间包括其他昂贵的操作。上面纯粹列出的函数运行时间接近约 30 分钟(我会很快找到确切的时间以用于基准测试。)
    • Fred 我无法相信你昨晚为此付出的努力(好吧,我所在的夜晚。)非常感谢!我将通过彻底测试和分析每个高性能方法来尝试和尊重它,尽管我现在(在睡个好觉之后)预测我在尝试优化我的 IO 例程时是错误的,并且需要优化我的转换正在执行 IO。
    【解决方案4】:

    我有一个处理文本文件的功能,并在读写和并行计算中使用缓冲区,以及进程的异步工作池。我有 2 个内核、8GB RAM、gnu/linux 的 AMD,可以在不到 1 秒的时间内处理 300000 行,在大约 4 秒内处理 1000000 行,在大约 20 秒内处理大约 4500000 行(超过 220MB):

    # -*- coding: utf-8 -*-
    import sys
    from multiprocessing import Pool
    
    def process_file(f, fo="result.txt", fi=sys.argv[1]):
        fi = open(fi, "r", 4096)
        fo = open(fo, "w", 4096)
        b = []
        x = 0
        result = None
        pool = None
        for line in fi:
            b.append(line)
            x += 1
            if (x % 200000) == 0:
                if pool == None:
                    pool = Pool(processes=20)
                if result == None:
                    result = pool.map_async(f, b)
                else:
                    presult = result.get()
                    result = pool.map_async(f, b)
                    for l in presult:
                        fo.write(l)
                b = []
        if not result == None:
            for l in result.get():
                fo.write(l)
        if not b == []:
            for l in b:
                fo.write(f(l))
        fo.close()
        fi.close()
    

    第一个参数是接收一行的函数,处理并返回结果将写入文件,下一个是输出文件,最后一个是输入文件(如果您在脚本文件中接收作为第一个参数,则不能使用最后一个参数输入)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-02-05
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多