【问题标题】:How to thread multiple subprocess instances in Python 2.7?如何在 Python 2.7 中线程化多个子进程实例?
【发布时间】:2014-01-26 05:45:21
【问题描述】:

我有三个命令,否则它们很容易在命令行上链接在一起,如下所示:

$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput

换句话说,firstCommand 处理来自标准输入的foo,并将结果通过管道传送到secondCommand,后者又处理输入并将其输出传送到thirdCommand,后者进行处理并将其输出重定向到文件finalOutput

我一直试图在 Python 脚本中使用线程来概括这一点。我想使用 Python 来处理来自 firstCommand 的输出,然后再将其传递给 secondCommand,然后再传递给 secondCommandthirdCommand

这是一段似乎不起作用的代码摘录:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.communicate()
second_process.communicate()
third_process.communicate()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

当我运行它时,脚本挂起:

$ echo foo | ./myConversionScript.py
** hangs here... **

如果我点击Ctrl-C 终止脚本,代码就会卡在third_thread.join() 行:

  C-c C-c
Traceback (most recent call last):
  File "./myConversionScript.py", line 786, in <module>
    sys.exit(main(*sys.argv))
  File "./myConversionScript.py", line 556, in main
    third_thread.join()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
    self.__block.wait()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
    waiter.acquire()
KeyboardInterrupt

如果我不使用third_processthird_thread,而是仅将数据从第一个线程的输出传递到第二个线程的输入,则不会挂起。

关于第三个线程的某些东西似乎导致事情中断,但我不知道为什么。

我认为communicate() 的意义在于它将处理三个进程的 I/O,所以我不确定为什么会出现 I/O 挂起。

如何让三个或更多命令/进程一起工作,其中一个线程消耗另一个线程/进程的输出?

更新

好的,我根据此处和其他网站上的一些 cmets 进行了一些似乎有帮助的更改。进程将完成wait() 以完成,并且在线程方法中,一旦线程处理了它可以处理的所有数据,我close() 管道。我担心大型数据集的内存使用率会很高,但至少一切正常:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.wait()
second_process.wait()
third_process.wait()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

【问题讨论】:

  • 谢谢,J.F.塞巴斯蒂安。但是,您的第一条评论中的解决方案不起作用 - 二进制文件的行为就像它们没有收到任何参数一样。第二条评论反映了使用两个二进制文件,而不是我的问题中的三个。正如我所说,我可以使用两个二进制文件和我上面描述的线程安排。我仍在努力查看第三条评论与我的问题的关系。
  • 我认为我的问题很清楚,但我会尝试澄清:我想运行该命令,但我需要从一个命令捕获输出块,处理它们,然后将它们发送到下一个命令。所以它会是这样的:echo foo | firstCommand - | somePythonRoutine - | secondCommand - | anotherPythonRoutine - | thirdCommand - &gt; finalOutput。这个想法是完全在一个 Python 脚本中完成所有 I/O 工作,而不是通过几个较小的脚本,并且通过使用 stdin/stdout 来处理数据而不使用临时文件来存储中间结果。希望这会有所帮助。

标签: python multithreading python-2.7 subprocess python-multithreading


【解决方案1】:

模仿:

echo foo |
firstCommand - | somePythonRoutine - |
secondCommand - | anotherPythonRoutine - |
thirdCommand - > finalOutput

您当前使用线程的方法有效:

from subprocess import Popen, PIPE

first = Popen(["firstCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1)
second = Popen(["secondCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1)
bind(first.stdout, second.stdin, somePythonRoutine)
with open("finalOutput", "wb") as file:
    third = Popen(["thirdCommand", "-"], stdin=PIPE, stdout=file, bufsize=1)
bind(second.stdout, third.stdin, anotherPythonRoutine)

# provide input for the pipeline
first.stdin.write(b"foo")
first.stdin.close()

# wait for it to complete
pipestatus = [p.wait() for p in [first, second, third]]

每个bind() 启动一个新线程:

from threading import Thread

def bind(input_pipe, output_pipe, line_filter):
    def f():
        try:
            for line in iter(input_pipe.readline, b''):
                line = line_filter(line)
                if line:
                    output_pipe.write(line) # no flush unless newline present
        finally:
            try:
                output_pipe.close()
            finally:
                input_pipe.close()
    t = Thread(target=f)
    t.daemon = True # die if the program exits
    t.start()

somePythonRoutineanotherPythonRoutine 接受单行并返回(可能已修改)。

【讨论】:

    【解决方案2】:

    communicate() 的重点是它返回进程的输出。这与您的管道设置冲突。

    您应该只在第三个进程上调用一次;所有其他的都通过管道连接并且知道如何相互通信 - 无需其他/手动干预。

    【讨论】:

    • 感谢您的回答。很遗憾,您建议的更改无效。
    • 下一步是在您的代码中添加日志记录,以查看哪个线程实际在工作以及它可能挂起的位置。
    猜你喜欢
    • 2013-01-10
    • 2017-05-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-10-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多