【发布时间】:2014-01-26 05:45:21
【问题描述】:
我有三个命令,否则它们很容易在命令行上链接在一起,如下所示:
$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput
换句话说,firstCommand 处理来自标准输入的foo,并将结果通过管道传送到secondCommand,后者又处理输入并将其输出传送到thirdCommand,后者进行处理并将其输出重定向到文件finalOutput。
我一直试图在 Python 脚本中使用线程来概括这一点。我想使用 Python 来处理来自 firstCommand 的输出,然后再将其传递给 secondCommand,然后再传递给 secondCommand 和 thirdCommand。
这是一段似乎不起作用的代码摘录:
first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)
first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))
first_thread.start()
second_thread.start()
third_thread.start()
first_thread.join()
second_thread.join()
third_thread.join()
first_process.communicate()
second_process.communicate()
third_process.communicate()
# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
chunk = from_stream.read(1024)
while chunk:
to_stream.write(chunk)
to_stream.flush()
chunk = from_stream.read(1024)
def consumeOutputFromFirstCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
break
processed_line = some_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
def consumeOutputFromSecondCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
break
processed_line = a_different_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
当我运行它时,脚本挂起:
$ echo foo | ./myConversionScript.py
** hangs here... **
如果我点击Ctrl-C 终止脚本,代码就会卡在third_thread.join() 行:
C-c C-c
Traceback (most recent call last):
File "./myConversionScript.py", line 786, in <module>
sys.exit(main(*sys.argv))
File "./myConversionScript.py", line 556, in main
third_thread.join()
File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
self.__block.wait()
File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
waiter.acquire()
KeyboardInterrupt
如果我不使用third_process 和third_thread,而是仅将数据从第一个线程的输出传递到第二个线程的输入,则不会挂起。
关于第三个线程的某些东西似乎导致事情中断,但我不知道为什么。
我认为communicate() 的意义在于它将处理三个进程的 I/O,所以我不确定为什么会出现 I/O 挂起。
如何让三个或更多命令/进程一起工作,其中一个线程消耗另一个线程/进程的输出?
更新
好的,我根据此处和其他网站上的一些 cmets 进行了一些似乎有帮助的更改。进程将完成wait() 以完成,并且在线程方法中,一旦线程处理了它可以处理的所有数据,我close() 管道。我担心大型数据集的内存使用率会很高,但至少一切正常:
first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)
first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))
first_thread.start()
second_thread.start()
third_thread.start()
first_thread.join()
second_thread.join()
third_thread.join()
first_process.wait()
second_process.wait()
third_process.wait()
# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
chunk = from_stream.read(1024)
while chunk:
to_stream.write(chunk)
to_stream.flush()
chunk = from_stream.read(1024)
def consumeOutputFromFirstCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
from_stream.close()
to_stream.close()
break
processed_line = some_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
def consumeOutputFromSecondCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
from_stream.close()
to_stream.close()
break
processed_line = a_different_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
【问题讨论】:
-
谢谢,J.F.塞巴斯蒂安。但是,您的第一条评论中的解决方案不起作用 - 二进制文件的行为就像它们没有收到任何参数一样。第二条评论反映了使用两个二进制文件,而不是我的问题中的三个。正如我所说,我可以使用两个二进制文件和我上面描述的线程安排。我仍在努力查看第三条评论与我的问题的关系。
-
我认为我的问题很清楚,但我会尝试澄清:我想运行该命令,但我需要从一个命令捕获输出块,处理它们,然后将它们发送到下一个命令。所以它会是这样的:
echo foo | firstCommand - | somePythonRoutine - | secondCommand - | anotherPythonRoutine - | thirdCommand - > finalOutput。这个想法是完全在一个 Python 脚本中完成所有 I/O 工作,而不是通过几个较小的脚本,并且通过使用stdin/stdout来处理数据而不使用临时文件来存储中间结果。希望这会有所帮助。
标签: python multithreading python-2.7 subprocess python-multithreading