【问题标题】:How can I read large text files line by line, without loading it into memory?如何逐行读取大型文本文件,而不将其加载到内存中?
【发布时间】:2011-09-22 10:49:41
【问题描述】:

我需要逐行读取一个大文件。假设该文件超过 5GB,我需要读取每一行,但显然我不想使用readlines(),因为它会在内存中创建一个非常大的列表。

下面的代码将如何处理这种情况? xreadlines 本身是不是一一读入内存?需要生成器表达式吗?

f = (line for line in open("log.txt").xreadlines())  # how much is loaded in memory?

f.next()  

另外,我该怎么做才能以相反的顺序阅读这篇文章,就像 Linux tail 命令一样?

我找到了:

http://code.google.com/p/pytailer/

python head, tail and backward read by lines of a text file

两者都工作得很好!

【问题讨论】:

标签: python python-2.x


【解决方案1】:

我提供这个答案是因为 Keith 的回答虽然简洁,但并没有明确地关闭文件

with open("log.txt") as infile:
    for line in infile:
        do_something_with(line)

【讨论】:

  • 问题仍然是,“for line in file”会将我的 5GB 行加载到内存中吗?还有,我怎样才能从 tail 中读取?
  • @rochacbruno,它一次只读取一行。读取下一行时,除非您在其他地方存储了对它的引用,否则前一行将被垃圾回收
  • @rochacbruno,不幸的是,以相反的顺序阅读这些行并不容易有效地完成。通常,您希望以合理大小的块(千字节到兆字节)从文件末尾读取并拆分换行符(或您平台上的任何行结束字符)
  • 谢谢!找到了尾解决方案stackoverflow.com/questions/5896079/…
  • @bawejakunal,你的意思是如果一行太长而无法一次加载到内存中?这对于 text 文件来说是不寻常的。您可以使用chunk = infile.read(chunksize) 来读取有限大小的块,而不考虑它们的内容,而不是使用迭代行的for 循环。您必须自己在块中搜索换行符。
【解决方案2】:

您需要做的就是使用文件对象作为迭代器。

for line in open("log.txt"):
    do_something_with(line)

在最近的 Python 版本中使用上下文管理器更好。

with open("log.txt") as fileobject:
    for line in fileobject:
        do_something_with(line)

这也会自动关闭文件。

【讨论】:

  • 那不是将整个文件加载到内存中?
  • 第一个例子中的循环后不应该关闭文件吗?
  • @maciejwww 是的,但我没有让它看起来更像 OP 示例。第二个示例使用with 语句是一个“上下文管理器”,可以自动关闭文件对象。
【解决方案3】:

老派的方法:

fh = open(file_name, 'rt')
line = fh.readline()
while line:
    # do stuff with line
    line = fh.readline()
fh.close()

【讨论】:

  • 次要备注:为了异常安全,建议使用'with'语句,在你的情况下是“with open(filename, 'rt') as fh:”
  • @prokher:是的,但我确实称其为“老派”。
【解决方案4】:

您最好使用迭代器。
相关:fileinput — Iterate over lines from multiple input streams.

来自文档:

import fileinput
for line in fileinput.input("filename", encoding="utf-8"):
    process(line)

这将避免一次将整个文件复制到内存中。

【讨论】:

  • 虽然文档将 sn-p 显示为“典型用途”,但在循环结束时使用它不会调用返回的 FileInput 类对象的 close() 方法——所以我会避免以这种方式使用它。在 Python 3.2 中,他们终于使 fileinput 与解决此问题的上下文管理器协议兼容(但代码仍然不会按照所示方式编写)。
【解决方案5】:

请试试这个:

with open('filename','r',buffering=100000) as f:
    for line in f:
        print line

【讨论】:

  • 请解释一下?
  • 来自 Python 的官方文档:link 可选的缓冲参数指定文件所需的缓冲区大小:0 表示无缓冲,1 表示行缓冲,任何其他正值表示使用(大约)该大小的缓冲区(以字节为单位)。负缓冲意味着使用系统默认值,通常对 tty 设备进行行缓冲,对其他文件进行完全缓冲。如果省略,则使用系统默认值
  • 节省了我的一天,就我而言,>~4gb 文件和两个文件处理程序(一个读取,另一个写入)python 挂起,现在很好!谢谢。
  • @jyotidas 虽然我喜欢这种方法,但您会冒着将文本中的行分成块的风险。我亲自看到了这一点,这意味着如果您像我一样在文件中搜索字符串,我会错过一些,因为它们所在的行被分成了块。有没有办法解决这个问题?使用 readlines 效果不佳,因为我记错了
【解决方案6】:

如果文件中没有换行符,请执行以下操作:

with open('large_text.txt') as f:
  while True:
    c = f.read(1024)
    if not c:
      break
    print(c)

【讨论】:

  • 虽然我喜欢这种方法,但您会冒着将文本中的行分成块的风险。我亲自看到了这一点,这意味着如果您像我一样在文件中搜索字符串,我会错过一些,因为它们所在的行被分成了块。有没有办法解决这个问题?使用 readlines 效果不佳,因为我记错了@Ariel Cabib
【解决方案7】:

我无法相信这会像@john-la-rooy 的回答看起来那么简单。因此,我使用逐行读写重新创建了cp 命令。速度太快了。

#!/usr/bin/env python3.6

import sys

with open(sys.argv[2], 'w') as outfile:
    with open(sys.argv[1]) as infile:
        for line in infile:
            outfile.write(line)

【讨论】:

  • 注意:因为 python 的 readline 标准化了行尾,这具有将 DOS 行尾 \r\n 的文档转换为 Unix 行尾 \n 的副作用。我搜索这个主题的全部原因是我需要转换一个接收混乱行尾的日志文件(因为开发人员盲目地使用了各种 .NET 库)。我震惊地发现,在我最初的速度测试之后,我不需要回去和rstrip 行。已经很完美了!
【解决方案8】:

blaze 项目在过去 6 年中取得了长足的进步。它有一个简单的 API,涵盖了有用的 pandas 功能子集。

dask.dataframe 在内部负责分块,支持许多可并行化的操作,并允许您轻松地将切片导出回 pandas 以进行内存中的操作。

import dask.dataframe as dd

df = dd.read_csv('filename.csv')
df.head(10)  # return first 10 rows
df.tail(10)  # return last 10 rows

# iterate rows
for idx, row in df.iterrows():
    ...

# group by my_field and return mean
df.groupby(df.my_field).value.mean().compute()

# slice by column
df[df.my_field=='XYZ'].compute()

【讨论】:

    【解决方案9】:

    这是加载任何大小的文本文件而不会导致内存问题的代码。 它支持千兆字节大小的文件

    https://gist.github.com/iyvinjose/e6c1cb2821abd5f01fd1b9065cbc759d

    下载文件 data_loading_utils.py 并将其导入您的代码中

    用法

    import data_loading_utils.py.py
    file_name = 'file_name.ext'
    CHUNK_SIZE = 1000000
    
    
    def process_lines(data, eof, file_name):
    
        # check if end of file reached
        if not eof:
             # process data, data is one single line of the file
    
        else:
             # end of file reached
    
    data_loading_utils.read_lines_from_file_as_data_chunks(file_name, chunk_size=CHUNK_SIZE, callback=self.process_lines)
    

    process_lines 方法是回调函数。它将为所有行调用,参数数据一次代表文件的一行。

    您可以根据您的机器硬件配置配置变量CHUNK_SIZE

    【讨论】:

    • 虽然我喜欢这种方法,但您会冒着将文本中的行分成块的风险。我亲自看到了这一点,这意味着如果您像我一样在文件中搜索字符串,我会错过一些,因为它们所在的行被分成了块。有没有办法解决这个问题?使用 readlines 效果不佳,因为我记错了
    【解决方案10】:

    这个怎么样? 把你的文件分成块然后逐行读取,因为当你读取一个文件时,你的操作系统会缓存下一行。如果您逐行读取文件,则无法有效利用缓存信息。

    相反,将文件分成块并将整个块加载到内存中,然后进行处理。

    def chunks(file,size=1024):
        while 1:
    
            startat=fh.tell()
            print startat #file's object current position from the start
            fh.seek(size,1) #offset from current postion -->1
            data=fh.readline()
            yield startat,fh.tell()-startat #doesnt store whole list in memory
            if not data:
                break
    if os.path.isfile(fname):
        try:
            fh=open(fname,'rb') 
        except IOError as e: #file --> permission denied
            print "I/O error({0}): {1}".format(e.errno, e.strerror)
        except Exception as e1: #handle other exceptions such as attribute errors
            print "Unexpected error: {0}".format(e1)
        for ele in chunks(fh):
            fh.seek(ele[0])#startat
            data=fh.read(ele[1])#endat
            print data
    

    【讨论】:

    • 这看起来很有希望。这是按字节还是按行加载?如果按字节计算,我担心行会被破坏。我们如何一次加载 1000 行并进行处理?
    【解决方案11】:

    谢谢!我最近转换为 python 3,并且对使用 readlines(0) 读取大文件感到沮丧。这解决了问题。但是为了得到每一行,我必须做几个额外的步骤。每行前面都有一个“b”,我猜它是二进制格式。使用“decode(utf-8)”将其更改为 ascii。

    然后我必须删除每行中间的“=\n”。

    然后我在新行拆分行。

    b_data=(fh.read(ele[1]))#endat This is one chunk of ascii data in binary format
            a_data=((binascii.b2a_qp(b_data)).decode('utf-8')) #Data chunk in 'split' ascii format
            data_chunk = (a_data.replace('=\n','').strip()) #Splitting characters removed
            data_list = data_chunk.split('\n')  #List containing lines in chunk
            #print(data_list,'\n')
            #time.sleep(1)
            for j in range(len(data_list)): #iterate through data_list to get each item 
                i += 1
                line_of_data = data_list[j]
                print(line_of_data)
    

    这是从 Arohi 代码中“打印数据”上方开始的代码。

    【讨论】:

      【解决方案12】:

      我找到的最佳解决方案,我在 330 MB 的文件上进行了尝试。

      lineno = 500
      line_length = 8
      with open('catfour.txt', 'r') as file:
          file.seek(lineno * (line_length + 2))
          print(file.readline(), end='')
      

      其中 line_length 是单行中的字符数。例如“abcd”的行长为 4。

      我在行长中添加了 2 以跳过 '\n' 字符并移至下一个字符。

      【讨论】:

        【解决方案13】:

        我意识到这个问题在很久以前就已经得到了回答,但是这里有一种并行执行的方法,而不会杀死您的内存开销(如果您尝试将每一行都放入池中,就会出现这种情况)。显然,将 readJSON_line2 函数换成有意义的东西——这只是为了说明这一点!

        加速将取决于文件大小和您对每一行所做的操作 - 但对于小文件并仅使用 JSON 阅读器读取它的最坏情况,我看到使用以下设置的 ST 的性能相似。

        希望对外面的人有用:

        def readJSON_line2(linesIn):
          #Function for reading a chunk of json lines
           '''
           Note, this function is nonsensical. A user would never use the approach suggested 
           for reading in a JSON file, 
           its role is to evaluate the MT approach for full line by line processing to both 
           increase speed and reduce memory overhead
           '''
           import json
        
           linesRtn = []
           for lineIn in linesIn:
        
               if lineIn.strip() != 0:
                   lineRtn = json.loads(lineIn)
               else:
                   lineRtn = ""
                
               linesRtn.append(lineRtn)
        
           return linesRtn
        
        
        
        
        # -------------------------------------------------------------------
        if __name__ == "__main__":
           import multiprocessing as mp
        
           path1 = "C:\\user\\Documents\\"
           file1 = "someBigJson.json"
        
           nBuffer = 20*nCPUs  # How many chunks are queued up (so cpus aren't waiting on processes spawning)
           nChunk = 1000 # How many lines are in each chunk
           #Both of the above will require balancing speed against memory overhead
        
           iJob = 0  #Tracker for SMP jobs submitted into pool
           iiJob = 0  #Tracker for SMP jobs extracted back out of pool
        
           jobs = []  #SMP job holder
           MTres3 = []  #Final result holder
           chunk = []  
           iBuffer = 0 # Buffer line count
           with open(path1+file1) as f:
              for line in f:
                    
                  #Send to the chunk
                  if len(chunk) < nChunk:
                      chunk.append(line)
                  else:
                      #Chunk full
                      #Don't forget to add the current line to chunk
                      chunk.append(line)
                        
                      #Then add the chunk to the buffer (submit to SMP pool)                  
                      jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
                      iJob +=1
                      iBuffer +=1
                      #Clear the chunk for the next batch of entries
                      chunk = []
                                    
                  #Buffer is full, any more chunks submitted would cause undue memory overhead
                  #(Partially) empty the buffer
                  if iBuffer >= nBuffer:
                      temp1 = jobs[iiJob].get()
                      for rtnLine1 in temp1:
                          MTres3.append(rtnLine1)
                      iBuffer -=1
                      iiJob+=1
                    
              #Submit the last chunk if it exists (as it would not have been submitted to SMP buffer)
              if chunk:
                  jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
                  iJob +=1
                  iBuffer +=1
        
              #And gather up the last of the buffer, including the final chunk
              while iiJob < iJob:
                  temp1 = jobs[iiJob].get()
                  for rtnLine1 in temp1:
                      MTres3.append(rtnLine1)
                  iiJob+=1
        
           #Cleanup
           del chunk, jobs, temp1
           pool.close()
        

        【讨论】:

          【解决方案14】:

          当您想要并行工作并且只读取数据块但使用新行保持干净时,这可能很有用。

          def readInChunks(fileObj, chunkSize=1024):
              while True:
                  data = fileObj.read(chunkSize)
                  if not data:
                      break
                  while data[-1:] != '\n':
                      data+=fileObj.read(1)
                  yield data
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 2011-01-16
            • 1970-01-01
            • 2020-06-13
            • 2023-03-29
            • 2022-01-01
            • 2011-08-17
            • 1970-01-01
            相关资源
            最近更新 更多