【问题标题】:What is the most efficient way to get first and last line of a text file?获取文本文件的第一行和最后一行的最有效方法是什么?
【发布时间】:2010-07-27 17:58:24
【问题描述】:

我有一个文本文件,每行都包含一个时间戳。我的目标是找到时间范围。所有的时间都是按顺序排列的,所以第一行是最早的时间,最后一行是最晚的时间。我只需要第一行和最后一行。在 python 中获取这些行的最有效方法是什么?

注意:这些文件的长度相对较大,每个大约 1-2 百万行,我必须为数百个文件执行此操作。

【问题讨论】:

    标签: python file seek


    【解决方案1】:

    要读取文件的第一行和最后一行,您可以...

    • 打开文件,...
    • ...使用内置readline()读取第一行,...
    • ...寻找(移动光标)到文件末尾,...
    • ...后退一步,直到遇到EOL(换行符)和...
    • ...从那里读取最后一行。
    def readlastline(f):
        f.seek(-2, 2)              # Jump to the second last byte.
        while f.read(1) != b"\n":  # Until EOL is found ...
            f.seek(-2, 1)          # ... jump back, over the read byte plus one more.
        return f.read()            # Read all data from this point on.
        
    with open(file, "rb") as f:
        first = f.readline()
        last = readlastline(f)
    

    直接跳转到最后一个字节,防止尾随换行符导致返回空行*。

    每次读取一个字节时,当前偏移量会前移一个,因此每次后退两个字节,经过最近读取的字节和下一个要读取的字节。

    传递给fseek(offset, whence=0)whence 参数表明fseek 应该寻找相对于...的offset 字节位置

    * 正如大多数应用程序(包括printecho)所期望的那样,在写入的每一行都附加一个,并且对缺少尾随换行符的行没有影响。


    效率

    每个 1-2 百万行,我必须为数百个文件执行此操作。

    我对该方法进行了计时,并将其与最佳答案进行了比较。

    10k iterations processing a file of 6k lines totalling 200kB: 1.62s vs 6.92s.
    100 iterations processing a file of 6k lines totalling 1.3GB: 8.93s vs 86.95.
    

    数百万行会增加差异很多更多。

    用于计时的准确代码:

    with open(file, "rb") as f:
        first = f.readline()     # Read and store the first line.
        for last in f: pass      # Read all lines, keep final value.
    

    修正

    一个更复杂、更难阅读的变体,用于解决 cmets 和此后提出的问题。

    • 解析空文件时返回空字符串,由comment引发。
    • 在没有找到分隔符时返回所有内容,由comment 提出。
    • 避免相对偏移以支持text mode,由comment 提出。
    • UTF16/UTF32 hack,由 comment 记录。

    还增加了对多字节分隔符的支持,readlast(b'X<br>Y', b'<br>', fixed=False)

    请注意,由于文本模式下需要非相对偏移,因此对于大文件,这种变化真的很慢。根据您的需要进行修改,或者根本不使用它,因为您最好使用 f.readlines()[-1] 以文本模式打开文件。

    #!/bin/python3
    
    from os import SEEK_END
    
    def readlast(f, sep, fixed=True):
        r"""Read the last segment from a file-like object.
    
        :param f: File to read last line from.
        :type  f: file-like object
        :param sep: Segment separator (delimiter).
        :type  sep: bytes, str
        :param fixed: Treat data in ``f`` as a chain of fixed size blocks.
        :type  fixed: bool
        :returns: Last line of file.
        :rtype: bytes, str
        """
        bs   = len(sep)
        step = bs if fixed else 1
        if not bs:
            raise ValueError("Zero-length separator.")
        try:
            o = f.seek(0, SEEK_END)
            o = f.seek(o-bs-step)    # - Ignore trailing delimiter 'sep'.
            while f.read(bs) != sep: # - Until reaching 'sep': Read sep-sized block
                o = f.seek(o-step)   #  and then seek to the block to read next.
        except (OSError,ValueError): # - Beginning of file reached.
            f.seek(0)
        return f.read()
    
    def test_readlast():
        from io import BytesIO, StringIO
        
        # Text mode.
        f = StringIO("first\nlast\n")
        assert readlast(f, "\n") == "last\n"
        
        # Bytes.
        f = BytesIO(b'first|last')
        assert readlast(f, b'|') == b'last'
        
        # Bytes, UTF-8.
        f = BytesIO("X\nY\n".encode("utf-8"))
        assert readlast(f, b'\n').decode() == "Y\n"
        
        # Bytes, UTF-16.
        f = BytesIO("X\nY\n".encode("utf-16"))
        assert readlast(f, b'\n\x00').decode('utf-16') == "Y\n"
      
        # Bytes, UTF-32.
        f = BytesIO("X\nY\n".encode("utf-32"))
        assert readlast(f, b'\n\x00\x00\x00').decode('utf-32') == "Y\n"
        
        # Multichar delimiter.
        f = StringIO("X<br>Y")
        assert readlast(f, "<br>", fixed=False) == "Y"
        
        # Make sure you use the correct delimiters.
        seps = { 'utf8': b'\n', 'utf16': b'\n\x00', 'utf32': b'\n\x00\x00\x00' }
        assert "\n".encode('utf8' )     == seps['utf8']
        assert "\n".encode('utf16')[2:] == seps['utf16']
        assert "\n".encode('utf32')[4:] == seps['utf32']
        
        # Edge cases.
        edges = (
            # Text , Match
            (""    , ""  ), # Empty file, empty string.
            ("X"   , "X" ), # No delimiter, full content.
            ("\n"  , "\n"),
            ("\n\n", "\n"),
            # UTF16/32 encoded U+270A (b"\n\x00\n'\n\x00"/utf16)
            (b'\n\xe2\x9c\x8a\n'.decode(), b'\xe2\x9c\x8a\n'.decode()),
        )
        for txt, match in edges:
            for enc,sep in seps.items():
                assert readlast(BytesIO(txt.encode(enc)), sep).decode(enc) == match
    
    if __name__ == "__main__":
        import sys
        for path in sys.argv[1:]:
            with open(path) as f:
                print(f.readline()    , end="")
                print(readlast(f,"\n"), end="")
    

    【讨论】:

    • 这是最简洁的方案,我喜欢。不猜测块大小的好处是它适用于小型测试文件。我添加了几行并将其包装在一个我喜欢称之为tail_n 的函数中。
    • 我喜欢它在纸上,但不能让它发挥作用。 File "mapper1.2.2.py", line 17, in get_last_line f.seek(-2, 2) IOError: [Errno 22] Invalid argument
    • 没关系,文件是空的,derp。无论如何最好的答案。 +1
    • 根据this comment作为答案,这个while f.read(1) != "\n":应该是while f.read(1) != b"\n":
    • 另外记录一下:如果你得到异常io.UnsupportedOperation: can't do nonzero end-relative seeks,你必须分两步完成:首先找到文件的长度,然后添加偏移量,然后将其传递给@987654360 @
    【解决方案2】:

    docs for io module

    with open(fname, 'rb') as fh:
        first = next(fh).decode()
    
        fh.seek(-1024, 2)
        last = fh.readlines()[-1].decode()
    

    这里的变量值为1024:它代表平均字符串长度。我仅选择 1024 为例。如果您估计了平均线长,您可以使用该值乘以 2。

    由于您对行长的可能上限一无所知,因此显而易见的解决方案是遍历文件:

    for line in fh:
        pass
    last = line
    

    您无需担心二进制标志,您可以使用 open(fname)

    ETA:由于您有很多文件要处理,您可以使用random.sample 创建一个包含几十个文件的示例,并在它们上运行此代码以确定最后一行的长度。位置偏移的先验值很大(比如说 1 MB)。这将帮助您估算完整运行的价值。

    【讨论】:

    • 只要行不超过 1024 个字符。
    • 不保证行不超过1024个字符,除了行上的时间戳外,可能还有其他垃圾。
    • @pasbino:你有一些上限吗?
    • 使用fh.seek(-1024, os.SEEK_END) 代替fh.seek(-1024, 2) 可提高可读性。
    • 以下情况不正确:您无需担心二进制标志,您可以使用open(fname) 使用b 标志打开是至关重要的。如果你使用open(fname) 而不是open(fname, 'rb')will get io.UnsupportedOperation: can't do nonzero end-relative seeks
    【解决方案3】:

    这是 SilentGhost 答案的修改版本,可以满足您的需求。

    with open(fname, 'rb') as fh:
        first = next(fh)
        offs = -100
        while True:
            fh.seek(offs, 2)
            lines = fh.readlines()
            if len(lines)>1:
                last = lines[-1]
                break
            offs *= 2
        print first
        print last
    

    这里不需要行长的上限。

    【讨论】:

      【解决方案4】:

      你可以使用unix命令吗?我认为使用head -1tail -n 1 可能是最有效的方法。或者,您可以使用简单的fid.readline() 来获取第一行和fid.readlines()[-1],但这可能会占用太多内存。

      【讨论】:

      • 嗯,那么创建一个子进程来执行这些命令是最有效的方式吗?
      • 如果你有 unix,那么os.popen("tail -n 1 %s" % filename).read() 可以很好地得到最后一行。
      • +1 表示头 -1 和尾 -1。 fid.readlines()[-1] 对于大文件不是一个好的解决方案。
      • os.popen("tail -n 1 %s" % filename).read() --> 2.6 版后已弃用
      【解决方案5】:

      这是我的解决方案,也兼容 Python3。它也管理边界案例,但缺少对 utf-16 的支持:

      def tail(filepath):
          """
          @author Marco Sulla (marcosullaroma@gmail.com)
          @date May 31, 2016
          """
      
          try:
              filepath.is_file
              fp = str(filepath)
          except AttributeError:
              fp = filepath
      
          with open(fp, "rb") as f:
              size = os.stat(fp).st_size
              start_pos = 0 if size - 1 < 0 else size - 1
      
              if start_pos != 0:
                  f.seek(start_pos)
                  char = f.read(1)
      
                  if char == b"\n":
                      start_pos -= 1
                      f.seek(start_pos)
      
                  if start_pos == 0:
                      f.seek(start_pos)
                  else:
                      char = ""
      
                      for pos in range(start_pos, -1, -1):
                          f.seek(pos)
      
                          char = f.read(1)
      
                          if char == b"\n":
                              break
      
              return f.readline()
      

      它的灵感来自Trasp's answerAnotherParker's comment

      【讨论】:

        【解决方案6】:

        首先以读取模式打开文件。然后使用 readlines() 方法逐行读取。所有行都存储在一个列表中。现在您可以使用列表切片来获取文件的第一行和最后一行。

            a=open('file.txt','rb')
            lines = a.readlines()
            if lines:
                first_line = lines[:1]
                last_line = lines[-1]
        

        【讨论】:

        • 我正在搜索这个,我不需要第一行和最后一行,所以 lines[1,-2] 给出了标题和页脚之间的文本。
        • 此选项无法处理空文件。
        • 超大文件崩溃
        【解决方案7】:
        w=open(file.txt, 'r')
        print ('first line is : ',w.readline())
        for line in w:  
            x= line
        print ('last line is : ',x)
        w.close()
        

        for 循环遍历这些行,x 获取最后一次迭代的最后一行。

        【讨论】:

        • 这应该是公认的答案。我不知道为什么其他答案中的低级 io 都乱七八糟?
        • @GreenAsJade 我的理解是,“乱来”是为了避免从头到尾读取整个文件。这在大文件上可能效率低下。
        【解决方案8】:
        with open("myfile.txt") as f:
            lines = f.readlines()
            first_row = lines[0]
            print first_row
            last_row = lines[-1]
            print last_row
        

        【讨论】:

        • 您能解释一下为什么您的解决方案会更好吗?
        • 嗨,我发现自己有同样的需要,删除文本文件中最后一行的最后一个逗号,这样我就很容易找到它;我当时想分享一下。这个方案一直简单实用,立竿见影,但不知道效率上是不是最快。你能告诉我什么?
        • 好吧,它必须读取和处理整个文件,所以这似乎是效率最低的方式。
        • 好的...所以,如果您不知道字符串长度,哪种方法最好?我需要尝试另一个(stackoverflow.com/a/3346492/2149425)。谢谢!
        • 使用f.readlines()[-1] 代替新变量。 0 = 第一行1 = 第二行-1 = 最后一行, -2 = 最后一行之前的行...
        【解决方案9】:

        这是@Trasp 答案的扩展,它具有处理只有一行的文件的极端情况的附加逻辑。如果您反复想要读取持续更新的文件的最后一行,处理这种情况可能会很有用。如果没有这个,如果你试图抓取刚刚创建的文件的最后一行,并且只有一行,IOError: [Errno 22] Invalid argument 将被引发。

        def tail(filepath):
            with open(filepath, "rb") as f:
                first = f.readline()      # Read the first line.
                f.seek(-2, 2)             # Jump to the second last byte.
                while f.read(1) != b"\n": # Until EOL is found...
                    try:
                        f.seek(-2, 1)     # ...jump back the read byte plus one more.
                    except IOError:
                        f.seek(-1, 1)
                        if f.tell() == 0:
                            break
                last = f.readline()       # Read last line.
            return last
        

        【讨论】:

          【解决方案10】:

          没有人提到使用 reversed:

          f=open(file,"r")
          r=reversed(f.readlines())
          last_line_of_file = r.next()
          

          【讨论】:

          • .readlines() 会一次性将文件中的所有行读入内存——这不是解决这个问题的方法
          【解决方案11】:

          获取第一行非常容易。对于最后一行,假设您知道行长的大致上限,os.lseekSEEK_END 的某个数量找到倒数第二行结束,然后找到readline() 最后一行。

          【讨论】:

          • 我没有行长的近似上限
          【解决方案12】:
          with open(filename, "rb") as f:#Needs to be in binary mode for the seek from the end to work
              first = f.readline()
              if f.read(1) == '':
                  return first
              f.seek(-2, 2)  # Jump to the second last byte.
              while f.read(1) != b"\n":  # Until EOL is found...
                  f.seek(-2, 1)  # ...jump back the read byte plus one more.
              last = f.readline()  # Read last line.
              return last
          

          上述答案是上述答案的修改版本,用于处理文件中只有一行的情况

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多