【问题标题】:Optimized processing of very large files超大文件的优化处理
【发布时间】:2011-07-04 13:35:21
【问题描述】:

我的任务相对简单:对于输入文件中的每一行,测试该行是否满足给定的一组条件,如果满足,则将该行的特定列写入新文件。我已经编写了一个执行此操作的 python 脚本,但我需要一些帮助:1)提高速度,2)在列名方面的最佳工作方式(因为列号可能因文件而异),以及 3 ) 指定过滤条件和所需输出列的最佳方式。

1) 我使用的文件包含天文图像的光度测量。每个文件大约 1e6 行乘 150 列浮点数,通常大小超过 1GB。我有一个旧的 AWK 脚本,它将在大约 1 分钟内处理这样的文件;我的 python 脚本需要 5 到 7 分钟。我经常需要调整过滤条件并重新运行几次,直到输出文件是我想要的,所以速度绝对是可取的。我发现 for 循环非常快。这就是我在循环中做事的方式,这会减慢它的速度。与将整行读入内存相比,使用 itemgetter 仅挑选出我想要的列是一个很大的改进,但我不确定我能做些什么来进一步提高速度。这能和 AWK 一样快吗?

2) 我想根据列名而不是列号来工作,因为特定数量(光子计数、背景、信噪比等)的列号可以在文件之间更改。在我的 AWK 脚本中,我总是需要检查指定条件和输出列的列号是否正确,即使过滤和输出适用于相同的数量。我在 python 中的解决方案是创建一个字典,为每个数量分配一个列号。当一个文件有不同的列时,我只需要指定一个新的字典。也许有更好的方法来做到这一点?

3) 理想情况下,我只需要指定输入和输出文件的名称、过滤条件和要输出的所需列,它们会在我的脚本顶部找到,所以我不需要去搜索代码只是为了调整一些东西。我的主要问题是未定义的变量。例如,典型条件是“SNR > 4”,但在开始从测光文件中读取行之前,实际上并未为“SNR”(信噪比)分配值。我的解决方案是使用字符串和 eval/exec 的组合。再次,也许有更好的方法?

我根本没有接受过计算机科学方面的培训(我是天文学专业的研究生)——我通常只是将一些东西拼凑起来并调试直到它起作用。但是,针对我以上三点的优化对我的研究来说变得非常重要。我为这篇冗长的帖子道歉,但我觉得这些细节会有所帮助。除了清理/编码风格之外,您对我的任何和所有建议都将不胜感激。

非常感谢, 杰克

#! /usr/bin/env python2.6

from operator import itemgetter


infile = 'ugc4305_1.phot'
outfile = 'ugc4305_1_filt.phot'

# names must belong to dicitonary
conditions = 'OBJ <= 2 and SNR1 > 4 and SNR2 > 4 and FLAG1 < 8 and FLAG2 < 8 and (SHARP1 + SHARP2)**2 < 0.075 and (CROWD1 + CROWD2) < 0.1'

input = 'OBJ, SNR1, SNR2, FLAG1, FLAG2, SHARP1, SHARP2, CROWD1, CROWD2'
    # should contain all quantities used in conditions

output = 'X, Y, OBJ, COUNTS1, BG1, ACS1, ERR1, CHI1, SNR1, SHARP1, ROUND1, CROWD1, FLAG1, COUNTS2, BG2, ACS2, ERR2, CHI2, SNR2, SHARP2, ROUND2, CROWD2, FLAG2'

# dictionary of col. numbers for the more important qunatities
columns = dict(EXT=0, CHIP=1, X=2, Y=3, CHI_GL=4, SNR_GL=5, SHARP_GL=6, ROUND_GL=7, MAJAX_GL=8, CROWD_GL=9, OBJ=10, COUNTS1=11, BG1=12, ACS1=13, STD1=14, ERR1=15, CHI1=16, SNR1=17, SHARP1=18, ROUND1=19, CROWD1=20, FWHM1=21, ELLIP1=22, PSFA1=23, PSFB1=24, PSFC1=25, FLAG1=26, COUNTS2=27, BG2=28, ACS2=29, STD2=30, ERR2=31, CHI2=32, SNR2=33, SHARP2=34, ROUND2=35, CROWD2=36, FWHM2=37, ELLIP2=38, PSFA2=39, PSFB2=40, PSFC2=41, FLAG2=42)



f = open(infile)
g = open(outfile, 'w')


# make string that extracts values for testing
input_items = []
for i in input.replace(',', ' ').split():
    input_items.append(columns[i])
input_items = ', '.join(str(i) for i in input_items)

var_assign = '%s = [eval(i) for i in itemgetter(%s)(line.split())]' % (input, input_items) 


# make string that specifies values for writing
output_items = []
for i in output.replace(',', ' ').split():
    output_items.append(columns[i])
output_items = ', '.join(str(i) for i in output_items)

output_values = 'itemgetter(%s)(line.split())' % output_items


# make string that specifies format for writing
string_format = []
for i in output.replace(',', ' ').split():
    string_format.append('%s')
string_format = ' '.join(string_format)+'\n'


# main loop
for line in f:
   exec(var_assign)
   if eval(conditions):
      g.write(string_format % tuple(eval(output_values)))
f.close()
g.close()

【问题讨论】:

  • 你能告诉我们数据文件,以及你想要的输出吗?您还应该将 input.replace(',', ' ').split() 替换为 input.split(",")。
  • 原始光度文件here,输出文件here。至于replace 语句,我想我只是把它放在那里,以防我忘记在inputoutput 字符串中的条目之间留一个空格。不过我同意你的建议,它更干净。
  • 这些文件太大,无法下载。你能用几行几列构建一个例子吗?
  • 抱歉耽搁了——去here获取原始光度文件的样本,here获取我的脚本输出。
  • 另外,感谢迄今为止帮助过我的所有人。我仍在消化所有建议,并试图找出最适合我的方法。我肯定会学到很多新东西!仍然欢迎新的建议,我会让人们知道我发现了什么。

标签: python performance optimization astronomy


【解决方案1】:

你试过熊猫吗?

我相信 OBJ, SNR1, ... 是列名,我希望您在所有行上应用相同的条件。 如果是这种情况,我建议您使用 pandas。

你的代码 sn-p 会变成这样......

import pandas as pd

infile = 'ugc4305_1.phot'
outfile = 'ugc4305_1_filt.phot'

df = pd.read_csv(infile)

condition = (df['OBJ'] <= 2) & (df['SRN1'] > 4) & (df['SRN2'] > 4) & (df['FLAG1'] < 8) & (df['FLAG2'] < 8) & ((df['SHARP1'] + df['SHARP2'])**2 < 0.075) & ((df['CROWD1'] + df['CROWD2']) < 0.1)
newDf = df[condition]

columnNames = ['col1', 'col2', ...] # column names you want in result

newDf = df[columnNames]

newDf.to_csv(outfile)

【讨论】:

    【解决方案2】:

    如果分析表明在文件的实际读取和解析上花费了大量时间,并且您将多次处理相同的原始文件,您可以尝试创建一种优化的中间文件格式以使用 Python 读取。

    可以尝试一次读取文件,使用pickle/cPickle 解析并输出结果。然后在过滤器脚本中使用 pickle/cpickle 读取中间文件。

    由于对 python 的了解不够好,无法判断这是否比读取每一行并拆分它们更快。 (在 c# 中我会使用二进制序列化程序,但我不知道这在 python 中是否可用)。

    如果磁盘 IO 是一个瓶颈,您也可以尝试压缩输入文件并使用 gzip 模块读取它们。

    【讨论】:

      【解决方案3】:

      这就是我将如何去做这样的事情......

      这运行时间约为 35 秒,而我的机器上的原始脚本运行时间约为 3 分钟。可以添加更多优化(例如,我们只需要将一些列转换为浮点数),但这只会缩短运行时间几秒钟。

      您也可以在这里轻松使用csv.DictReader,正如一些人所建议的那样。我正在避免它,因为您必须定义一种自定义方言,而且如果没有它,只需几行额外的代码就可以完成同样的事情。 (各种csv 模块类还检查在这种特殊情况下您不需要担心的更复杂的行为(例如引用的字符串等)。它们在许多情况下非常非常方便,但它们是在这种情况下有点矫枉过正。)

      请注意,您还可以在调用脚本时轻松添加 infile 和 outfile 名称作为参数,而不是硬编码它们(即infile = sys.argv[0] 等)。这也可以让您轻松地将数据输入或输出...(您可以检查sys.argv 的长度并将infileoutfile 设置为sys.stdin 和/或sys.stdout 相应)

      def main():
          infile = 'ugc4305_1.phot'
          outfile = 'ugc4305_1_filt.phot'
          process_data(infile, outfile)
      
      def filter_conditions(row):
          for key, value in row.iteritems():
              row[key] = float(value)
      
          cond = (row['OBJ'] <= 2 and row['SNR1'] > 4 
             and row['SNR2'] > 4 and row['FLAG1'] < 8 
             and row['FLAG2'] < 8 
             and (row['SHARP1'] + row['SHARP2'])**2 < 0.075 
             and (row['CROWD1'] + row['CROWD2']) < 0.1
             )
          return cond
      
      def format_output(row):
          output_columns = ('X', 'Y', 'OBJ', 'COUNTS1', 'BG1', 'ACS1', 'ERR1', 'CHI1', 
                           'SNR1', 'SHARP1', 'ROUND1', 'CROWD1', 'FLAG1', 'COUNTS2', 
                           'BG2', 'ACS2', 'ERR2', 'CHI2', 'SNR2', 'SHARP2', 'ROUND2', 
                           'CROWD2', 'FLAG2')
          delimiter = '\t'
          return delimiter.join((row[name] for name in output_columns))
      
      def process_data(infilename, outfilename):
          column_names = ('EXT', 'CHIP', 'X', 'Y', 'CHI_GL', 'SNR_GL', 'SHARP_GL', 
                          'ROUND_GL', 'MAJAX_GL', 'CROWD_GL', 'OBJ', 'COUNTS1', 
                          'BG1', 'ACS1', 'STD1', 'ERR1', 'CHI1', 'SNR1', 'SHARP1', 
                          'ROUND1', 'CROWD1', 'FWHM1', 'ELLIP1', 'PSFA1', 'PSFB1', 
                          'PSFC1', 'FLAG1', 'COUNTS2', 'BG2', 'ACS2', 'STD2', 
                          'ERR2', 'CHI2', 'SNR2', 'SHARP2', 'ROUND2', 'CROWD2', 
                          'FWHM2', 'ELLIP2', 'PSFA2', 'PSFB2', 'PSFC2', 'FLAG2')
      
          with open(infilename) as infile:
              with open(outfilename, 'w') as outfile:
                  for line in infile:
                      line = line.strip().split()
                      row = dict(zip(column_names, line))
                      if filter_conditions(row.copy()):
                          outfile.write(format_output(row) + '\n')
      
      if __name__ == '__main__':
          main()
      

      【讨论】:

      • Awww,你把所有的乐趣都给了他——不过做得很好;)
      • 圣杯!在我的笔记本电脑上运行这需要超过 70 秒,而 AWK 大约需要 60 秒 - 这是速度上的提升。并且输入是灵活的。我认为这几乎包含了 JimB、nmichaels 和 tkerwin 建议的所有内容。你可以确定我整个周末都会研究这个来弄清楚它是如何/为什么起作用的。正如其他人所建议的那样,我还将尝试使用 csv.DictReader 。非常感谢!
      • 一些小问题:当line 转换为process_data 中的列表时,strip() 的用途是什么——你得到的列表与line.split() 不一样吗?另外,为什么filter_conditions 得到row 的副本,而format_output 得到原始的row 字典?谢谢
      • @Jake - line.strip() 删除行尾的空格(即\n)。如果您的分隔符始终是空格,您可以将其取出。我出于习惯将其包括在内(例如,如果您要以逗号分隔,这是必要的。line.split(),将有效地删除尾随的\n,因此在这种情况下不需要它。)filter_conditions 获取副本这样row 中的项目就可以在filter_conditions 函数内转换为浮点数,而无需修改原始字符串。我正在复制原始功能,因此此代码不会尝试重新格式化输出。
      • @JimB - 谢谢! @Jake - 希望它对一些人有所帮助......不过,Jim 是对的,通过实践学习可能比通过例子学习更好。 :)
      【解决方案4】:

      我在这里的第一步是摆脱exec()eval() 调用。每次评估字符串时,都必须对其进行编译,然后执行,这会增加文件每一行的函数调用开销。更不用说,eval 往往会导致代码混乱、难以调试,通常应该避免。

      您可以通过将您的逻辑放入一个易于理解的小函数来开始重构。例如,您可以将eval(conditions) 替换为函数,例如:

      def conditions(d):
          return (d[OBJ] <= 2 and
                  d[SNRI] > 4 and
                  d[SNR2] > 4 and
                  d[FLAG1] < 8 and ...
      

      提示:如果您的某些条件语句失败的可能性较高,请将它们放在第一位,python 将跳过对其余条件的评估。

      我会摆脱列名字典,只需在文件顶部设置一堆变量,然后通过line[COLNAME] 引用列。这可以帮助您简化某些部分,例如条件函数,并且您可以按名称引用列,而无需分配每个变量。

      【讨论】:

      • 我一定会尝试这种方法。我喜欢关于放置条件的提示,这完全有道理。我经常需要为每一行做很多测试。
      【解决方案5】:

      就像nmichaels所说的,你可以使用csv.DictReaderfieldnamesdialect参数来读取这个文件。然后,对于每一行,你都会有一本字典。有了字典,你就不必使用eval,并且可以使用像

      这样的语句
      if data_item['OBJ'] <= 2 and data_item['SNR1']:
           g.write(data_item['X'], data_item['Y'], data_item['OBJ'])
      

      由于所有evals,您现在的操作方式既缓慢又复杂。不需要那么复杂。

      【讨论】:

      • 好的,但我想保持我的if 语句的条件是任意的,这就是我使用字符串和eval(conditions) 的原因。我喜欢使用 csv.DictReader 的想法,因为它可以清理一切,但如果我想要任意的 if 语句,我不确定如何绕过 eval
      • 您的意思是您希望它依赖于用户输入?在这种情况下,您可能希望基于配置文件生成一个函数。如果你坚持使用eval,至少使用compile,这样你就不会每次迭代都解析字符串。
      【解决方案6】:

      我认为您没有提到它,但您的数据看起来像是在 csv 中。你可能会从使用csv.DictReader 中得到很多。您可以一次遍历文件 1 行(避免将整个内容加载到内存中)并通过名称引用列。

      如果您还没有,您还应该看看cProfile,Python 的分析器。它会告诉你程序的哪些部分花费了最多的时间来执行。

      【讨论】:

      • 感谢有关 cProfile 的提示 - 我会看一下,虽然我不确定我是否知道如何重新编码慢位。
      • 另外,我的数据不是 csv,只是简单的空格分隔。列未在文件中命名;我必须自己命名它们,这就是我实现字典的原因。最后,我对for line in f: 的理解是它逐行迭代,而不会将整个内容加载到内存中。无论哪种方式,我都测试了 for 循环,它非常快。是里面的东西很慢(我想我可以用 cProfile 确认),我只是不知道其他的编写方法来使它更快,因为我对 Python 比较陌生。
      • 对不起,我完全误解了你的建议。我会检查 csv.DictReader。
      猜你喜欢
      • 2023-03-13
      • 1970-01-01
      • 2016-03-28
      • 1970-01-01
      • 2016-09-18
      • 2021-10-06
      • 2019-08-08
      • 2012-07-19
      • 1970-01-01
      相关资源
      最近更新 更多