【问题标题】:a text file circular buffer in pythonpython中的文本文件循环缓冲区
【发布时间】:2011-06-09 13:53:10
【问题描述】:

我需要一个 python 脚本,为文本文件中的行实现循环缓冲区,限制为 N 行,如下所示:

        row 1 -> pop
        row 2
        row 3
         |
         |
push -> row N

什么是最好的解决方案?

编辑: 此脚本应创建和维护仅包含最新 N 行的文本文件。然后它应该弹出推入的第一行。就像一个先进先出缓冲区。

【问题讨论】:

  • 您只是想读取文本文件的最后 N 行,还是您的脚本正在创建和维护仅包含您写入的最后 N 行的文本文件?
  • 我的脚本应该创建和维护只包含最新 N 行的文本文件。然后它应该弹出推入的第一行。就像一个先进先出缓冲区。

标签: python file fifo circular-buffer


【解决方案1】:

使用collections.deque。它支持maxlen 参数。

d = collections.deque(maxlen=10)
for line in f:
    d.append(line)
    # ...

【讨论】:

  • 使用文件不起作用fd = open(filename, 'r+'); d = collections.deque(fd, maxlen=10)
  • @s23:对我有用。有什么问题?
  • ...怎么样?在d.append('string') 之后文件为空。谢谢。
【解决方案2】:

试试我的食谱,对不起意大利语用法:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#       fifo(.py)
#       
#       Copyright 2011 Fabio Di Bernardini <fdb@altraqua.com>
#       
#       This program is free software; you can redistribute it and/or modify
#       it under the terms of the GNU General Public License as published by
#       the Free Software Foundation; either version 2 of the License, or
#       (at your option) any later version.
#       
#       This program is distributed in the hope that it will be useful,
#       but WITHOUT ANY WARRANTY; without even the implied warranty of
#       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#       GNU General Public License for more details.
#       
#       You should have received a copy of the GNU General Public License
#       along with this program; if not, write to the Free Software
#       Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
#       MA 02110-1301, USA.

def string_conditioned(string):
    return string.decode('string_escape').rstrip() + '\n'

def pop(n, size, filename):
    with open(filename, 'r+U') as fd:
        rows = fd.readlines()
    with open(filename, 'w') as fd:
        n = int(n)
        fd.writelines(rows[n:])
        return ''.join(rows[:n])

def trim_fifo(row, size, filename):
    size = int(size)
    with open(filename, 'rU') as fd:
        rows = fd.readlines()
    num_rows = len(rows)
    if num_rows >= size:
        n = string_conditioned(row).count('\n')
        pop(num_rows + n - size, size, filename)

def push(row, size, filename):
    trim_fifo(row, size, filename)
    with open(filename, 'a') as fd:
        fd.write(string_conditioned(row))
    return ''

def main():
    import sys
    try:
        command  = sys.argv[1]
        param    = sys.argv[2]
        size     = sys.argv[3]
        filename = sys.argv[4]
        sys.stdout.write({
        '--push': push,
        '--pop' : pop,
        }[command](param, size, filename))
    except Exception, e:
        print r"""
Uso:
       fifo --push ROW MAX_ROWS FILE
       fifo --pop  NUM MAX_ROWS FILE

fifo implementa un buffer ad anello di righe di testo, Quando viene inserita
una riga che fa superare il numero massimo di righe (MAX_ROWS) elimina la riga
più vecchia.

Comandi:
  --push    accoda la riga di testo ROW nel FILE rimuovendo le righe più vecchie
            se il file supera MAX_ROWS. Usare '\n' per separare righe multiple.
  --pop     stampa le prime NUM righe e le rimuove dal FILE. MAX_ROWS viene
            ignorato ma deve essere comunque specificato.

Esempi:
       fifo --push 'row_one \n row_two' 10 fifo.txt
       fifo --pop 2 10 fifo.txt
"""
        print e

if __name__ == '__main__':
    main()

【讨论】:

    【解决方案3】:
    import collections
    
    def keep_last_n_and_return_first_of_last_n(filename, n):
        with open(filename, "r") as inp:
             lines= collections.deque(inp, maxlen=n)
        with open(filename, "w") as out:
             out.writelines(lines)
        return lines[0]
    

    【讨论】:

      猜你喜欢
      • 2016-10-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-06-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多