【问题标题】:how to concatenate multiple files for stdin of Popen如何为 Popen 的标准输入连接多个文件
【发布时间】:2011-08-27 21:14:39
【问题描述】:

我正在将一个 bash 脚本移植到 python 2.6,并且想要替换一些代码:

cat $( ls -tr xyz_`date +%F`_*.log ) | filter args > bzip2

我想我想要类似于http://docs.python.org/release/2.6/library/subprocess.html 的“替换 shell 管道”示例,唉...

p1 = Popen(["filter", "args"], stdin=*?WHAT?*, stdout=PIPE)
p2 = Popen(["bzip2"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]

但是,我不确定如何最好地提供p1stdin 值,以便连接输入文件。看来我可以补充...

p0 = Popen(["cat", "file1", "file2"...], stdout=PIPE)
p1 = ... stdin=p0.stdout ...

...但这似乎超出了使用(缓慢、低效)管道来调用具有重要功能的外部程序的范围。 (任何体面的 shell 都会在内部执行 cat。)

所以,我可以想象一个满足文件对象 API 要求的自定义类,因此可以用于 p1 的标准输入,连接任意其他文件对象。 (编辑:现有答案解释了为什么这是不可能的

python 2.6 是否有一种机制可以满足这种需求/想要,或者在 python 圈子中是否可以认为另一个 Popencat 非常好?

谢谢。

【问题讨论】:

  • filter 是做什么的?您是否需要为此功能调用外部程序?
  • @Sven: filter 是一个几百行的 C++ 程序,处理大约 10GB 的输入……重写有点多,而且以 C++ 的速度进行快速编辑/测试很方便回转。也就是说,在一份不平凡的工作中多加一个Popen/cat 并不是什么大问题,只是感觉很草率:-}。

标签: python pipe concatenation popen


【解决方案1】:

您可以用 Python 代码替换您正在执行的所有操作,但您的外部实用程序除外。这样,只要您的外部工具是可移植的,您的程序就会保持可移植性。您还可以考虑将 C++ 程序转换为库并使用 Cython 与其交互。正如 Messa 所示,date 被替换为 time.strftime,通配符由 glob.glob 完成,cat 可以替换为读取列表中的所有文件并将它们写入程序的输入。对bzip2 的调用可以替换为bz2 模块,但这会使您的程序复杂化,因为您必须同时读取和写入。为此,您需要使用 p.communicate 或在数据量很大时使用线程(select.select 会是更好的选择,但它不适用于 Windows)。

import sys
import bz2
import glob
import time
import threading
import subprocess

output_filename = '../whatever.bz2'
input_filenames = glob.glob(time.strftime("xyz_%F_*.log"))
p = subprocess.Popen(['filter', 'args'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
output = open(output_filename, 'wb')
output_compressor = bz2.BZ2Compressor()

def data_reader():
    for filename in input_filenames:
        f = open(filename, 'rb')
        p.stdin.writelines(iter(lambda: f.read(8192), ''))
    p.stdin.close()

input_thread = threading.Thread(target=data_reader)
input_thread.start()

with output:
    for chunk in iter(lambda: p.stdout.read(8192), ''):
        output.write(output_compressor.compress(chunk))

    output.write(output_compressor.flush())

input_thread.join()
p.wait()

补充:如何检测文件输入类型

您可以使用 libmagic 的文件扩展名或 Python 绑定来检测文件的压缩方式。这是一个代码示例,它同时执行这两种操作,并自动选择magic(如果可用)。您可以选择适合您需要的部分,并根据您的需要进行调整。 open_autodecompress 应该检测 mime 编码并使用适当的解压缩器打开文件(如果可用)。

import os
import gzip
import bz2
try:
    import magic
except ImportError:
    has_magic = False
else:
    has_magic = True


mime_openers = {
    'application/x-bzip2': bz2.BZ2File,
    'application/x-gzip': gzip.GzipFile,
}

ext_openers = {
    '.bz2': bz2.BZ2File,
    '.gz': gzip.GzipFile,
}


def open_autodecompress(filename, mode='r'):
    if has_magic:
        ms = magic.open(magic.MAGIC_MIME_TYPE)
        ms.load()
        mimetype = ms.file(filename)
        opener = mime_openers.get(mimetype, open)
    else:
        basepart, ext = os.path.splitext(filename)
        opener = ext_openers.get(ext, open)
    return opener(filename, mode)

【讨论】:

  • 嗨,罗什。再次感谢这个 - 优雅而简洁的代码。尽管如此,我发现我在一些主机上的一些输入在当天早些时候被压缩(gzip 或 bzip2),所以在管道中使用bzcat --force 实际上很方便简洁(尽管我确信还有一些调整您在上面使用的 bz2 库也可以在输入端满足此要求)。但是——压垮骆驼的稻草等等。我稍后会出于自己的兴趣尝试一下,但更愿意为这种生产用途提供一些非常简单的东西。希望其他人会发现这个问题/答案很有用。干杯。
  • 我添加了一个示例,说明如何在读取时检测输入文件的文件类型,如果您觉得它有用的话。
【解决方案2】:

如果您查看 subprocess 模块实现的内部,您将看到 std{in,out,err} 应该是支持 fileno() 方法的文件对象,因此是一个简单的带有 python 接口的类文件对象(或即使是 StringIO 对象)也不适合这里。

如果是迭代器,而不是文件对象,你可以使用itertools.chain

当然,牺牲内存消耗你可以这样做:

import itertools, os

# ...

files = [f for f in os.listdir(".") if os.path.isfile(f)]
input = ''.join(itertools.chain(open(file) for file in files))
p2.communicate(input)

【讨论】:

  • 感谢fileno() 的解释。有这么多输入(~10GB),内存连接可能不合适,但感谢您说明该技术。所以,我更了解什么不该尝试!干杯。
  • @Tony,我的意见:如果你有一个大文件,就用 Popen(cat)。当然,你可以在 python 中重新实现它,但是有什么用呢?无论如何这将是一个单独的过程(如果您想将其输出自动提供给 Popen 对象),那么为什么不使用标准工具呢?
  • 很有趣...我还看不到这样做的方法,所以我不清楚是否有方法会涉及一个单独的过程。但是,无论如何,我对Popen(["cat"...]) 感觉越来越舒服......对于我的特定应用程序来说没什么大不了的,但我发现很难想象一般来说外部过程将是最佳实践。从好的方面来说,Popen(["cat"...) 简洁易懂....
  • @Tony,这确实不是最有效的方法,但 subprocess.Popen 对象没有为您的案例提供有效的解决方案提供方便的 API。检查subprocess 模块源,实现communicate() 方法。要获得有效的解决方案,您需要重新实现它,用返回搅拌的迭代器替换其字符串 input 参数。
【解决方案3】:

使用子进程时,您必须考虑这样一个事实,即 Popen 内部将使用文件描述符(处理程序)并在将它们传递给创建的子进程之前为 stdin、stdout 和 stderr 调用 os.dup2()。

所以如果你不想在 Popen 中使用系统 shell 管道:

p0 = Popen(["cat", "file1", "file2"...], stdout=PIPE)
p1 = Popen(["filter", "args"], stdin=p0.stdout, stdout=PIPE)

...

我认为您的另一个选择是在 python 中编写一个 cat 函数并以类似 cat 的方式生成一个 file 并将这个 file 传递给 p1 stdin,不要考虑一个实现 io API 的类,因为它不会像我说的那样工作,因为在内部子进程只会获取文件描述符。

话虽如此,我认为您更好的选择是使用 unix PIPE 方式,例如 subprocess doc

【讨论】:

  • 我和你在一起直到最后一行......“第一个解决方案”是指你在上面显示为“微不足道的解决方案”的猫的 Popen 吗?您提供的链接讨论了关闭 stdout 对于早期管道的重要性......我错过了,所以谢谢!我开始认为这可能会尽可能好,但会让问题悬而未决......干杯。
  • @Tony:是的,“第一个解决方案”我指的是“微不足道的解决方案”:),但是在您发表评论后,我认为将“微不足道的解决方案”称为管道解决方案:)。已编辑
  • 顺便说一句/我不应该真正创建一个带有连接输出的临时文件,因为我想确保即使磁盘上的可用空间最小,程序也能运行(这太常见了,似乎都是大银行有些人的工作是将磁盘使用量限制在令人沮丧的愚蠢数量,即使在生产机器上也是如此:-/)。
  • @Tony: 是的,这是使用 cat 和 PIPE 的另一个原因 :)
【解决方案4】:

这应该很容易。首先,使用os.pipe 创建一个管道,然后以管道的读取端作为标准输入打开filter。然后对于目录中名称与模式匹配的每个文件,只需将其内容传递到管道的写入端。这应该与 shell 命令cat ..._*.log | filter args 所做的完全相同。

更新:对不起,来自os.pipe 的管道不需要,我忘记了subprocess.Popen(..., stdin=subprocess.PIPE) 实际上为你创建了一个。另外一个管道不能塞满太多数据,只有在读取之前的数据后才能将更多数据写入管道。

所以解决方案(例如wc -l)将是:

import glob
import subprocess

p = subprocess.Popen(["wc", "-l"], stdin=subprocess.PIPE)

processDate = "2011-05-18"  # or time.strftime("%F")
for name in glob.glob("xyz_%s_*.log" % processDate):
    f = open(name, "rb")
    # copy all data from f to p.stdin
    while True:
        data = f.read(8192)
        if not data:
            break  # reached end of file
        p.stdin.write(data)
    f.close()

p.stdin.close()
p.wait()

使用示例:

$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_a.log 
$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_b.log 
$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_c.log 
$ ./example.py 
   30000

【讨论】:

  • 感谢 Messa - 会试一试,虽然我担心如果我在调用 p2.communicate()[0] 之前尝试将所有 ~10GB 输入写入管道,它会在 64 KB 后阻塞或失败或任何缓冲区大小。也许在communicate 运行时我需要另一个线程来喂管道?会看看情况如何。感谢探索之路。
  • 我一直在玩这个:stackoverflow.com/questions/163542/… 的公认答案似乎证实了我的观察结果 - 过滤器必须具有 stdin=subprocess.PIPE 而不是从 os.pipe() 返回的可读 fd。奇怪的是,我只能在communicate() 介于 filter_pipe.stdin.write(...) 和....close() 之间,这表明communicate() 是非阻塞的。这似乎排除了线程方法 - communicate() 不会等到另一个线程完成写入。内存数据太多......
  • 您可以将其缩短为for chunk in iter(lambda: f.read(8192), ''): p.stdin.write(chunk)p.stdin.writelines(iter(lambda: f.read(8192), ''))
  • @Messa, @Rosh:你们的两个解决方案看起来都非常有前途 - 我会在早上检查它们,然后报告它们是否适用于相关输入量。感谢你们的时间和精力。
  • @Messa,这并不容易,如果您打开的子进程创建一个不以自动“写入”数据的文件描述符终止的大输出(例如是管道),它肯定会阻塞.证明:gist.github.com/979549。这种情况需要更复杂的机器,带有螺纹或类似的东西,以便在写入输入管道的同时排空输出管道的另一端。检查subprocess.py 中的communicate() 实现。
猜你喜欢
  • 2018-08-14
  • 1970-01-01
  • 2018-12-13
  • 2014-01-01
  • 1970-01-01
  • 2010-09-21
  • 2013-03-10
  • 2020-02-06
  • 1970-01-01
相关资源
最近更新 更多