【问题标题】:"3-way" Python subprocess pipeline: send stdout and stderr to two different processes?“3 路”Python 子流程管道:将标准输出和标准错误发送到两个不同的进程?
【发布时间】:2013-11-04 18:50:33
【问题描述】:

“标准”子流程管道技术(例如http://docs.python.org/2/library/subprocess.html#replacing-shell-pipeline)能否“升级”为两个管道?

# How about
p1 = Popen(["cmd1"], stdout=PIPE, stderr=PIPE)
p2 = Popen(["cmd2"], stdin=p1.stdout)
p3 = Popen(["cmd3"], stdin=p1.stderr)
p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
p1.stderr.close()
#p2.communicate()  # or p3.communicate()?

好的,这实际上是一个不同的用例,但最近的起点似乎是管道示例。顺便说一句,“正常”管道中的 p2.communicate() 如何驱动 p1?这是供参考的正常管道:

# From Python docs
output=`dmesg | grep hda`
# becomes
p1 = Popen(["dmesg"], stdout=PIPE)
p2 = Popen(["grep", "hda"], stdin=p1.stdout, stdout=PIPE)
p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
output = p2.communicate()[0]

我想我最终对communicate() 支持什么样的“过程图”(或者可能只是树?)感兴趣,但我们将把一般情况留到另一天。

更新:这是基线功能。没有communicate(),创建2个线程从p1.stdout和p2.stdout读取。在主进程中,通过 p1.stdin.write() 注入输入。问题是我们是否可以只使用communicate() 来驱动一个1-source,2-sink 图

【问题讨论】:

标签: python subprocess pipe


【解决方案1】:

你可以使用 bash 的 process substitution:

from subprocess import check_call

check_call("cmd1 > >(cmd2) 2> >(cmd3)", shell=True, executable="/bin/bash")

它将cmd1 的标准输出重定向到cmd2,并将cmd1 的标准错误重定向到cmd3

如果您不想使用bash,那么您问题中的代码应该可以正常工作,例如:

#!/usr/bin/env python
import sys
from subprocess import Popen, PIPE
from textwrap import dedent

# generate some output on stdout/stderr
source = Popen([sys.executable, "-c", dedent("""
    from __future__ import print_function
    import sys
    from itertools import cycle
    from string import ascii_lowercase

    for i, c in enumerate(cycle(ascii_lowercase)):
        print(c)
        print(i, file=sys.stderr)
""")], stdout=PIPE, stderr=PIPE)

# convert input to upper case
sink = Popen([sys.executable, "-c", dedent("""
    import sys

    for line in sys.stdin:
        sys.stdout.write(line.upper())
""")], stdin=source.stdout)
source.stdout.close() # allow source to receive SIGPIPE if sink exits

# square input
sink_stderr = Popen([sys.executable, "-c", dedent("""
   import sys

   for line in sys.stdin:
       print(int(line)**2)
""")], stdin=source.stderr)
source.stderr.close() # allow source to receive SIGPIPE if sink_stderr exits

sink.communicate()
sink_stderr.communicate()
source.wait()

【讨论】:

  • 如果sink_stderr 填满底层操作系统管道,调用sink.communicate() 会不会死锁?这就是communicate() 函数的全部意义——如果需要,它会打开两个线程来读取stdoutstderr,避免阻塞
  • @dan3: .communicate() 不会做任何有趣的事情,除非使用PIPE(它用于sinksink_stderr 进程)。您可以在此处用简单的.wait() 调用来替换它。我使用.communicate() 来保持原始管道配方的连续性并支持更一般的情况。你可以使用Thread(target=sink_stderr.communicate, daemon=True).start()如果你使用stdout=PIPEstderr=PIPE作为sink_stderr否则没有必要。
  • 如果sink.communicate()(驱动source)导致source.stderrsink_stderr.stdin(尚未被读取)之间的PIPE 填满怎么办?这将阻止sink.communicate()
  • 您的回答中还有一个错字:source.stdout.close() 应改为 sink
  • @dan3: 1. source.stdout.close() 不是错字。查看原始管道配方。确保您了解为什么使用 p1.stdout.close() 2. sink = Popen(...)sink_stderr = Popen(...) 调用在文件描述符级别使用重定向(至少在 POSIX 上)。在sink.stdinsource.stderr 之间没有PIPEstdin=PIPEstdin=some_file_object 是完全不同的情况。正如我所说,在这种情况下,您甚至可以只使用 sink.wait()sink_stderr.wait() 调用而不是 .communicate()
【解决方案2】:

这里的解决方案是创建几个后台线程,它们从一个进程读取输出,然后将其写入多个进程的输入:

targets = [...] # list of processes as returned by Popen()
while True:
    line = p1.readline()
    if line is None: break
    for p in targets:
        p.stdin.write(line)

【讨论】:

  • 我可以有两个线程从 p2 和 p3 读取(在我的问题的示例中),并通过 p1.stdin.write() 简单地注入输入。我不是在寻找甚至 lower-tech 的东西 :)
猜你喜欢
  • 1970-01-01
  • 2013-07-16
  • 1970-01-01
  • 1970-01-01
  • 2019-05-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-12-14
相关资源
最近更新 更多