【问题标题】:Python File ParsingPython 文件解析
【发布时间】:2013-01-28 18:20:44
【问题描述】:

为了设置上下文,我有一个包含 200-300 个文件的目录,每个文件的大小范围(行数)。我粘贴文件并将它们导出到 csv 文件。我想我上次运行它时 csv 文件有超过 340,000 行。最重要的是,前 8 个文件不断被写入,所以有时在解析时会丢失数据。

现在,每个文件都是这样设置的:

DateTime Message Action ActionDetails

我有代码可以浏览所有文件,解析它们,然后输出到 csv 文件:

for infile in listing:
    _path2 = _path + infile
    f = open(_path2, 'r')
    labels = ['date', 'message', 'action', 'details']
    reader = csv.DictReader(f, labels, delimiter=' ', restkey='rest')

    for line in reader:
        if line.get('rest'):
            line['details'] += ' %s' % (' '.join(line['rest']))
        out_file.write(','.join([infile,line['date'], line['message'], line['action'], line['details']]) + '\n')

    f.close()
out_file.close()

我想知道复制前 8 个文件的“最佳”方法是什么,这样我在解析时就不会丢失数据。最好的意思是花费最少的时间,因为目前运行 python 脚本的总时间约为 35-45 秒。

【问题讨论】:

  • 复制文件并解析副本是否可以接受?
  • 我看过shutil 这样的东西,但不太了解它是否知道我想使用它。
  • @thegrinner 我不明白为什么不这样做。解析后文件将被删除,所以可能。
  • 前 8 个文件是写在文件的前面还是后面?即使您使用shutil 制作副本,它也最多可能是文件的一部分。如果是这种情况,您最好从文件的正确末尾读取(除非它不断被覆盖)。如果您需要整个文件,您可以放入一些逻辑以查看文件在读取时是否仍在增长,并且仅在停止时完成读取。它还取决于您正在运行的盒子。不幸的是,Windows 上的任何文件复制操作都不是原子操作,因此您可能会在那里遇到问题。
  • @PaulSeeb 文件正在被覆盖。看起来第一个被覆盖,然后是 seoncd,第三个,依此类推。这一切都发生在大约一分钟/分半钟内。

标签: python csv file-io export-to-csv


【解决方案1】:

我有点无聊。试穿这个尺寸。我实际上没有机会检查它是否正确解析和写入,但除此之外,我相信它应该在给出一些信息的情况下运行。这个问题是使用排队的好机会。让我知道它的运行速度!

from threading import Thread
import Queue
import os
import time
import sys

# declare some global items
# queue that an author thread can write line items to a csv
write_q = Queue.Queue()

# queue filled with files to parse 
read_q = Queue.Queue()

# queue filled with files that have size change during read. Can
# preload this queue to optimize however program should handle any
# file that changes during operation
moving_q = Queue.Queue()

# given csv labels
labels = ['date', 'message', 'action', 'details']

# global for writer thread so it knows when to close
files_to_parse = True

# parsing function for any number of threads
def file_parser():    
    # Each parser thread will run until the read_q is empty
    while True:
        moving = False
        # Test for a file from the read queue or moving queue 
        try:
            if not moving_q.empty():
                try:
                    f_path = moving_q.get(False)
                    moving = True
                # if the moving queue is empty after trying to read
                # might have been snatched by different thread. Ignore error
                except Queue.Empty:
                    pass
            else:
                # No items left in moving queue so grab non moving file
                f_path = read_q.get(False)
        # all files have been dealt with
        except Queue.Empty:
            print "Done Parsing"
            sys.exit()

        # Following will parse a file and test that the file is not being
        # modified during the read
        with open(f_path, 'r') as f:
            # csv reader setup
            reader = csv.DictReader(f, labels, delimiter=' ', restkey='rest')

            # initillized file size (when we started reading)
            pre = os.path.getsize(f_path)

            # store output items in a list so if file is updated during read
            # we can just ignore those items and read file later
            line_items = []

            # parse the file line by line
            for line in reader:
                # Check that file hasn't been updated
                post = os.path.getsize(f_path)
                if pre != post:
                    # if file has changed put the file back on the queue and clear the output lines
                    moving_q.put(f_path)
                    line_items = None
                    break
                # parse the line and add it to output list
                else:
                    if line.get('rest'):
                        line['details'] += ' %s' % (' '.join(line['rest']))
                        line_items.append(','.join([infile,line['date'], line['message'], line['action'], line['details']]) + '\n')

            # don't want to do reading and writing in same thread. Push
            # all line items onto the write thread for the author to deal with    
            if line_items and moving:
                write_q.put(line_items)
                moving_q.task_done()
            elif line_items and not moving:
                write_q.put(line_items)
                read_q.task_done()

# author thread that will write items to a file as other threads complete
# tasks. Should help speed up IO bound processing
def file_author(out_file):
    with open(out_file,'w') as f:
        # parse files until all the parser threads are running
        while files_to_parse or not read_q.empty():
            # only one writer thread so write as items are put into thread
            if not read_q.empty():
                line_items = write_q.get(False)
                for line_item in line_items:
                    f.write(line_item)
                write_q.task_done()
            # sleep in the downtime so we dont overload PC
            else:
                time.sleep(.1)
    print "Done writting"


if __name__ == "__main__":
    # list of file names as you had before
    listing = []
    outfile = "MyNewCSVfile.csv"

    # You can optimize parsing by adding known "moving files" directly
    # to the moving_queue, however program should handle either way
    for infile in listing:
        _path2 = _path + infile
        write_q.put(_path2)

    # make a writer thread
    t = Thread(target = file_author, args = (outfile,))
    t.daemon = True
    t.start()

    # make some parse threads
    for i in range(10):
        t = Thread(target = file_parser)
        t.daemon = True
        t.start()

    # wait for parser threads to finish work
    read_q.join()
    moving_q.join()

    # close author
    files_to_parse = False
    time.sleep(.1)
    print "Complete"

【讨论】:

  • 哈,没想到会有这么大的反应。不过还是谢谢你。它似乎工作正常,所有输出看起来都正确。我高估了我的运行时间(大约 20 秒),你的代码运行大约 14-15 秒。
  • 希望 cmets 帮助您理解它。我有点了解它,这是一个很好的问题。
猜你喜欢
  • 1970-01-01
  • 2017-07-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-15
  • 2022-01-19
相关资源
最近更新 更多