【问题标题】:Python how to read N number of lines at a timePython如何一次读取N行
【发布时间】:2011-09-14 05:13:07
【问题描述】:

我正在编写一个代码来一次获取一个巨大的文本文件(几 GB)N 行,处理该批处理,然后移动到接下来的 N 行,直到我完成整个文件。 (我不在乎最后一批是不是完美的尺寸)。

我一直在阅读有关使用 itertools islice 进行此操作的信息。我想我已经完成了一半:

from itertools import islice
N = 16
infile = open("my_very_large_text_file", "r")
lines_gen = islice(infile, N)

for lines in lines_gen:
     ...process my lines...

麻烦的是我想处理下一批16行,但我错过了一些东西

【问题讨论】:

标签: python lines itertools


【解决方案1】:

islice() 可用于获取迭代器的下一个 n 项。因此,list(islice(f, n)) 将返回文件f 的下一个n 行的列表。在循环中使用它会给你n 行的文件。在文件末尾,列表可能会更短,最后调用会返回一个空列表。

from itertools import islice
with open(...) as f:
    while True:
        next_n_lines = list(islice(f, n))
        if not next_n_lines:
            break
        # process next_n_lines

另一种方法是使用grouper pattern

with open(...) as f:
    for next_n_lines in izip_longest(*[f] * n):
        # process next_n_lines

【讨论】:

  • 这些天我在学习python,有一个问题,理想情况下,如果您正在读取数据库或记录文件,则需要将记录标记为已读(需要另一列),然后在下一个批处理你将开始处理下一个未标记的记录,这里是如何实现的?尤其是这里next_n_lines = list(islice(infile, n))
  • @zengr:我不明白你的问题。 list(islice(infile, n)) 将从文件中获取下一块 n 行。文件知道你已经阅读了什么,你可以继续阅读。
  • @Sven 说,我的批处理作业每天运行一次。我有一个 1M 行的巨大文本文件。但是,我只想在第一天阅读前 1000 行。作业停止。现在,第 2 天:我应该从第 1001 行开始处理同一个文件。那么,除了将行号数存储在其他地方之外,您如何维护它。
  • @zengr:您必须将计数器存储在某个地方。这是一个完全不相关的问题——使用右上角的“”按钮。
  • @dhfromkorea:我建议为此使用自定义生成器功能,请参阅gist.github.com/smarnach/75146be0088e7b5c503f
【解决方案2】:

使用来自What is the most “pythonic” way to iterate over a list in chunks?的chunker函数:

from itertools import izip_longest

def grouper(iterable, n, fillvalue=None):
    "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return izip_longest(*args, fillvalue=fillvalue)


with open(filename) as f:
    for lines in grouper(f, chunk_size, ""): #for every chunk_sized chunk
        """process lines like 
        lines[0], lines[1] , ... , lines[chunk_size-1]"""

【讨论】:

  • @Sven Marnach;抱歉,那个“grouper”一定是“chunker”。但我认为(我不太了解你的)它对你的 grouper 功能也是如此。编辑:不,它没有。
  • 仍然令人困惑。 1、chunker()定义两个参数,调用三个。 2. 将f 传递为seq 将尝试对文件对象进行切片,这根本行不通。您只能对序列进行切片。
  • @Sven Marnach;实际上,首先我从该问题的答案中获取了第一个答案,为此创建了代码,并认为第二个答案更好,并更改了函数,但我忘了更改函数调用。你对切片是对的,我的错误,试图纠正它。谢谢。
【解决方案3】:

这是使用groupby的另一种方式:

from itertools import count, groupby

N = 16
with open('test') as f:
    for g, group in groupby(f, key=lambda _, c=count(): c.next()/N):
        print list(group)

工作原理:

基本上 groupby() 将根据 key 参数的返回值对行进行分组,而 key 参数是 lambda 函数 lambda _, c=count(): c.next()/N 并使用当 @ 时 c 参数将绑定到 count() 的事实987654324@ 所以每次groupby() 都会调用 lambda 函数并计算返回值来确定分组线的分组器:

# 1 iteration.
c.next() => 0
0 / 16 => 0
# 2 iteration.
c.next() => 1
1 / 16 => 0
...
# Start of the second grouper.
c.next() => 16
16/16 => 1   
...

【讨论】:

    【解决方案4】:

    这个问题似乎假设通过一次读取 N 行块中的“巨大的文本文件”可以获得效率。这在已经高度优化的stdio 库上添加了一个缓冲应用层,增加了复杂性,并且可能绝对不会给您带来任何好处。

    因此:

    with open('my_very_large_text_file') as f:
        for line in f:
            process(line)
    

    在时间、空间、复杂性和可读性方面可能优于任何替代方案。

    另请参阅Rob Pike's first two rulesJackson's Two RulesPEP-20 The Zen of Python。如果你真的只是想玩islice,你应该忽略大文件的东西。

    【讨论】:

    • 嗨!我必须以 N 行的块处理我的巨大文本文件的原因是我从每组 N 中选择一个随机行。这是用于生物信息学分析,我想制作一个具有相同表示的较小文件整个数据集。并非所有数据在生物学中都是平等创建的!可能有一种不同的(也许更好?)方法可以从一个巨大的数据集中选择 X 条均匀分布的随机线,但这是我首先想到的。感谢您的链接!
    • @brokentypewriter 这是一个完全不同的问题,有更多统计上有用的样本。我将寻找现成的东西,并在这里将其变成一个新问题。当我这样做时,我会在这里放一个链接。自相关是一个可悲的人工制品。
    • 我在这个问题中回答了这个问题:stackoverflow.com/questions/6335839/…
    【解决方案5】:

    假设“批处理”意味着要一次处理所有 16 个记录而不是单独处理,一次读取文件一条记录并更新计数器;当计数器达到 16 时,处理该组。

    interim_list = []
    infile = open("my_very_large_text_file", "r")
    ctr = 0
    for rec in infile:
        interim_list.append(rec)
        ctr += 1
        if ctr > 15:
            process_list(interim_list)
            interim_list = []
            ctr = 0
    
    

    the final group

    process_list(interim_list)

    【讨论】:

      【解决方案6】:

      由于添加了从文件中选择的行在统计上均匀分布的要求,因此我提供了这种简单的方法。

      """randsamp - extract a random subset of n lines from a large file"""
      
      import random
      
      def scan_linepos(path):
          """return a list of seek offsets of the beginning of each line"""
          linepos = []
          offset = 0
          with open(path) as inf:     
              # WARNING: CPython 2.7 file.tell() is not accurate on file.next()
              for line in inf:
                  linepos.append(offset)
                  offset += len(line)
          return linepos
      
      def sample_lines(path, linepos, nsamp):
          """return nsamp lines from path where line offsets are in linepos"""
          offsets = random.sample(linepos, nsamp)
          offsets.sort()  # this may make file reads more efficient
      
          lines = []
          with open(path) as inf:
              for offset in offsets:
                  inf.seek(offset)
                  lines.append(inf.readline())
          return lines
      
      dataset = 'big_data.txt'
      nsamp = 5
      linepos = scan_linepos(dataset) # the scan only need be done once
      
      lines = sample_lines(dataset, linepos, nsamp)
      print 'selecting %d lines from a file of %d' % (nsamp, len(linepos))
      print ''.join(lines)
      

      我在一个包含 1.7GB 磁盘的 300 万行模拟数据文件上对其进行了测试。在我不太热的桌面上,scan_linepos 占据了大约 20 秒的运行时间。

      只是为了检查sample_lines 的性能,我使用了timeit 模块

      import timeit
      t = timeit.Timer('sample_lines(dataset, linepos, nsamp)', 
              'from __main__ import sample_lines, dataset, linepos, nsamp')
      trials = 10 ** 4
      elapsed = t.timeit(number=trials)
      print u'%dk trials in %.2f seconds, %.2fµs per trial' % (trials/1000,
              elapsed, (elapsed/trials) * (10 ** 6))
      

      对于nsamp的各种值;当 nsamp 为 100 时,单个 sample_lines 在 460µs 内完成,并以每次调用 47ms 线性扩展至 10k 个样本。

      下一个问题自然是Random is barely random at all?,答案是“亚密码学,但对生物信息学来说肯定没问题”。

      【讨论】:

      • @brokentypewriter - 感谢您从我的真实工作中愉快地转移 o.O
      • @msw 很棒的解决方案。它运行得非常快,我喜欢 random.sample 无需更换即可取样。唯一的问题是我在编写输出文件时出现内存错误......但我可能可以自己修复它。 (我将尝试的第一件事是一次编写一行输出文件,而不是将所有行连接在一起)。感谢您提供出色的解决方案!我有 900 万行,循环采样 11 次,所以节省时间的措施很棒!操作列表并将所有行加载到列表中运行时间太长了。
      • @msw 我已将其修复为一次将每一行写入 outfile 以避免内存问题。一切运行良好!运行需要 4 分 25 秒,这比运行以前的版本(迭代列表)要好 2 多个小时。我真的很喜欢这个解决方案只是将从偏移量中采样的行加载到内存中。这是一个简洁而有效的技巧。我可以说我今天学到了一些新东西!
      • @brokentypewriter - 很高兴能提供帮助,但该方法的功劳归功于 Kernighan 和 Plauger “Pascal 中的软件工具”(1981 年),他们使用此索引方法在一种没有原生字符类型的语言!有些技巧永远不会过时。
      • @brokentypewriter, msw: scan_linepos() 不包括列表中的偏移量 0,但它确实包括最后一行的偏移量。这意味着样本从不包括第一行,但如果超过最后一行的偏移量被命中,则可能包括一个空行。最简单的解决方法是交换 for 循环中的两行。
      猜你喜欢
      • 2011-08-15
      • 1970-01-01
      • 2012-12-04
      • 2020-02-25
      • 1970-01-01
      • 1970-01-01
      • 2011-01-30
      • 2019-02-28
      相关资源
      最近更新 更多