【问题标题】:Optimize python program to parse two large files at the same time优化python程序同时解析两个大文件
【发布时间】:2020-06-19 13:54:00
【问题描述】:

我正在尝试使用 Python3 同时解析两个大文件。如图所示:

dict = {}
row = {}
with open(file1, "r") as f1, open(file2, "r") as f2:
  zipped = zip(f1, f2)
  for line_f1, line_f2 in zipped:
    # parse the lines and save the line information in a dictionary 
    row = {"ID_1":line_f1[0], "ID_2":line_f2[0], ...}

    # This process takes roughly 0.0005s each time
    # it parses each pair of lines at once and returns an output
    # it doesn't depend on previous lines or lines after
    output = process(row) 

    # output is a string, add it to dict
    if output in dict:
       dict[output] += 1
    else:
       dict[output] = 1
return dict

当我用两个较小的文本文件(每个 30,000 行,文件大小 = 13M)测试上述代码时,完成循环大约需要 150 秒。

当我使用两个大文本文件(每个文件 9,000,000 行,文件大小 = 3.8G)进行测试时,没有循环中的处理步骤,大约需要 670 秒。

当我在流程步骤中使用相同的两个大文本文件进行测试时。我计算出每 10,000 件物品大约需要 60 秒。当迭代次数变大时,时间并没有增长。

但是,当我将此作业提交到共享集群时,一对大文件需要超过 36 小时才能完成处理。我试图弄清楚是否有任何其他方式来处理文件,以便它可以更快。任何建议,将不胜感激。

提前致谢!

【问题讨论】:

  • 如果您将文件分割成块,您可以使用多个 CPU 内核并行处理这些块。之后,您只需对所有作业的结果进行汇总即可。
  • @Błotosmętek 谢谢!那是我正在考虑的事情。但这会严重破坏我的代码结构。如果没有其他改进,我想我必须这样做。

标签: python optimization cluster-computing large-data data-processing


【解决方案1】:

这只是一个假设,但您的进程可能会在每次触发 I/O 以获取一对线时浪费其分配的 CPU 插槽。您可以尝试一次读取多组行并分块处理,这样您就可以充分利用在共享集群上获得的每个 CPU 时间段。

from collections import deque
chunkSize = 1000000 # number of characters in each chunk (you will need to adjust this)
chunk1    = deque([""]) #buffered lines from 1st file
chunk2    = deque([""]) #buffered lines from 2nd file
with open(file1, "r") as f1, open(file2, "r") as f2:
    while chunk1 and chunk2:
        line_f1 = chunk1.popleft()
        if not chunk1:
            line_f1,*more = (line_f1+file1.read(chunkSize)).split("\n")
            chunk1.extend(more)
        line_f2 = chunk2.popleft()
        if not chunk2:
            line_f2,*more = (line_f2+file2.read(chunkSize)).split("\n")
            chunk2.extend(more)
        # process line_f1, line_f2
        ....

其工作方式是读取一大块字符(必须大于最长的行)并将其分解为行。这些行被放置在队列中以进行处理。

由于块大小以字符数表示,因此队列中的最后一行可能不完整。

为了确保行在被处理之前是完整的,当我们到达队列中的最后一行时,会读取另一个块。附加字符被添加到不完整行的末尾,并对组合字符串执行行拆分。因为我们连接了最后(不完整的)行,所以.split("\n") 函数总是适用于从行边界开始的一段文本。

该过程继续(现已完成)最后一行,其余行被添加到队列中。

【讨论】:

  • 谢谢!我一定会试一试的。我有一个问题,在这种情况下,如果 chunkSize 是每个块中的字符数,这是否意味着如果我对所有文件都有固定的大小,并且它要求所有行的长度相同?换句话说,如果块大小是固定的,是否会有半行以line_f1line_f2 结尾?我想我不完全理解它在if not chunk1:中是如何工作的
  • 没有。在我的示例中进行分块的方式,您只需要具有相同数量的行。行边界由line_f1,*more = (line_f1+file1.read(chunkSize)).split("\n") 语句管理
猜你喜欢
  • 1970-01-01
  • 2014-11-07
  • 1970-01-01
  • 2015-10-20
  • 1970-01-01
  • 2018-01-20
  • 2017-08-14
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多