【问题标题】:Concatenating large files, piping, and a bonus连接大文件、管道和奖励
【发布时间】:2012-03-27 01:48:57
【问题描述】:

有类似的问题被问过(并得到了回答),但从未真正在一起,而且我似乎无法解决任何问题。由于我刚开始使用 Python,所以一些易于理解的东西会很棒!

我有 3 个大型数据文件 (>500G) 需要解压缩、连接、将其传送到子进程,然后将输出传送到另一个子进程。然后我需要处理我想在 Python 中执行的最终输出。注意我不需要解压缩和/或连接的文件,除了处理 - 创建一个我认为会浪费空间的文件。这是我到目前为止所拥有的......

import gzip
from subprocess import Popen, PIPE

#zipped files
zipfile1 = "./file_1.txt.gz"   
zipfile2 = "./file_2.txt.gz"  
zipfile3 = "./file_3.txt.gz"


# Open the first pipe
p1 = Popen(["dataclean.pl"], stdin=PIPE, stdout=PIPE)

# Unzip the files and pipe them in (has to be a more pythonic way to do it - 
# if this is even correct)
unzipfile1 = gzip.open(zipfile1, 'wb')
p1.stdin.write(unzipfile1.read())
unzipfile1.close()

unzipfile2 = gzip.open(zipfile2, 'wb')
p1.stdin.write(unzipfile2.read())
unzipfile2.close()

unzipfile3 = gzip.open(zipfile3, 'wb')
p1.stdin.write(unzipfile3.read())
unzipfile3.close()


# Pipe the output of p1 to p2
p2 = Popen(["dataprocess.pl"], stdin=p1.stdout, stdout=PIPE)

# Not sure what this does - something about a SIGPIPE
p1.stdout.close()

## Not sure what this does either - but it is in the pydoc
output = p2.communicate()[0]

## more processing of p2.stdout...
print p2.stdout

任何建议将不胜感激。 *作为一个额外的问题...... read() 的 pydoc 是这样说的:

“另请注意,在非阻塞模式下,即使没有给出大小参数,返回的数据也可能少于请求的数据。”

这看起来很可怕。任何人都可以解释它吗?我不想只阅读数据集的一部分,认为它就是全部。我认为保留文件的大小是一件好事,尤其是当我不知道文件的大小时。

谢谢,

GK

【问题讨论】:

  • 您确定要使用 Python 处理超过 TB 的数据吗?解压缩、连接和管道处理都在 shell 脚本或批处理文件中。
  • 我会尽量避免一次加载那么多数据。你到底想用这些数据做什么?您可能可以使用一系列生成器来完成此操作。
  • 目前它是通过 bash 脚本完成的,调用 perl 脚本进行一些数据清理,然后调用 C++ 脚本进行一些分析(非常大的 fMRI 文件)。我试图向原始 bash 脚本添加更多功能,但它变得有点冗长乏味。我想我会给 python 一个机会。听起来这是个坏主意?
  • 不要担心非阻塞模式——你会知道你是否使用它,因为它需要一种非常不同的编程风格。

标签: python python-2.7


【解决方案1】:

首先要做的事;我认为你的模式不正确:

unzipfile1 = gzip.open(zipfile1, 'wb')

这个should open zipfile1 for writing,不是在读。我希望您的数据仍然存在。

其次,您不想尝试一次处理所有数据。您应该使用 16k 或 32k 或其他块的数据。 (最佳大小会因许多因素而异;如果必须多次执行此任务,请使其可配置,以便您可以对不同大小进行计时。)

您正在寻找的可能更像是这个未经测试的伪代码:

while (block = unzipfile1.read(4096*4)):
    p1.stdin.write(a)

如果您尝试在 Python 中将多个进程连接到管道中,那么它可能看起来更像这样:

while (block = unzipfile1.read(4096*4)):
    p1.stdin.write(a)
    p2.stdin.write(p1.stdout.read())

这会尽快将p1 的输出提供给p2。我假设p1 不会产生比给定更多的输入。如果p1 的输出是输入的十倍,那么你应该再做一个类似的循环。


但是,我不得不说,复制 shell 脚本感觉需要做很多额外的工作:

gzip -cd file1.gz file2.gz file3.gz | dataclean.py | dataprocess.pl

gzip(1) 将自动处理块大小的数据传输,如上所述,假设您的 dataclean.pydataprocess.pl 脚本处理块中的数据而不是执行完整读取(就像您的该脚本的原始版本所做的那样),那么它们应该尽可能地并行运行。

【讨论】:

  • 我想让 Python 做的大部分事情是选择我想要的某些文件(取决于星期几、文件可用性等)。此外,想要调用不同的处理脚本,这取决于我想要做什么,以及文件的可用性。我想我可以用 Python 完成这一切,创建一个字符串,然后调用 os.system()。这会是一个更好的主意吗?
  • 如果os.system()you meant subprocess.call(),那么是的。 ;) 这是一种明智的做法。
  • 如果您要使用脚本做更多事情,那么是的,这很有意义。 Li-Aung 使用subprocess.call() 的建议非常值得关注。 :)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-22
  • 2016-02-19
  • 2012-07-29
  • 1970-01-01
  • 2019-01-07
相关资源
最近更新 更多