【问题标题】:Streaming read from subprocess从子进程流式读取
【发布时间】:2019-05-26 17:09:45
【问题描述】:

我需要在子进程生成时读取它的输出——也许不是在每个write 上,而是在进程完成之前。我已经尝试了 Python3 文档和 SO 问题 herehere 的解决方案,但在孩子终止之前我仍然一无所获。

该应用程序用于监控深度学习模型的训练。我需要获取测试输出(每次迭代大约 250 个字节,大约每 1 分钟间隔一次)并观察统计故障。

  • 我无法更改训练引擎;例如,我不能在子进程代码中插入stdout.flush()
  • 我可以合理地等待十几行输出累积;我希望缓冲区填充能够解决我的问题。

代码:变体被注释掉。

父母

cmd = ["/usr/bin/python3", "zzz.py"]
# test_proc = subprocess.Popen(
test_proc = subprocess.run(
    cmd,
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT
    )

out_data = ""
print(time.time(), "START")
while not "QUIT" in str(out_data):
    out_data = test_proc.stdout
    # out_data, err_data = test_proc.communicate()
    print(time.time(), "MAIN received", out_data)

孩子 (zzz.py)

from time import sleep
import sys

for _ in range(5):
    print(_, "sleeping", "."*1000)
    # sys.stdout.flush()
    sleep(1)

print("QUIT this exercise")

尽管发送了超过 1000 字节的行,但缓冲区(在其他地方测试为 2kb;在这里,我已经达到了 50kb)填充不会导致父级“看到”新文本。

让这个工作我缺少什么?


关于链接、cmets 和iBug 发布的答案的更新:

  • Popen 而不是 run 修复了阻塞问题。不知何故,我在文档和我对两者的实验中都错过了这一点。
  • universal_newline=True 巧妙地将字节返回更改为字符串:在接收端更易于处理,尽管有交错的空行(易于检测和丢弃)。
  • bufsize 设置为微小的东西(例如1)没有任何影响;父母仍然需要等待孩子填充stdout 缓冲区,在我的情况下为 8k。
  • export PYTHONUNBUFFERED=1 在执行之前确实修复了缓冲问题。感谢 wim 提供链接。

除非有人提出一个规范的、漂亮的解决方案使这些过时,否则我明天会接受 iBug 的回答。

【问题讨论】:

  • 它是否必须与从流中读取一起工作?你用的是什么框架?例如。在 TF/Keras 中,您可以传递回调来处理您想要查看的所有报告;也许这可能是一个不错的选择。
  • 我需要这样做一次 - 您是否尝试过 here 的 jfs 解决方案? (不是公认的答案)
  • @KostasMouratidis:我很了解 Keras;它不是测试环境的一部分。这只是TF。我相信我可以将代码插入到迭代循环中,但这不符合我们的目的。
  • 感谢您的指点和解决方案;将在接下来的几个小时内尝试它们。真正的日子正在干扰......

标签: python-3.x subprocess


【解决方案1】:

subprocess.run 总是产生子进程,并阻塞线程直到它退出

您唯一的选择是使用p = subprocess.Popen(...) 并读取带有s = p.stdout.readline()p.stdout.__iter__() 的行(见下文)。

此代码适用于我,如果子进程在打印一行后刷新标准输出(请参阅下面的扩展说明)。

cmd = ["/usr/bin/python3", "zzz.py"]
test_proc = subprocess.Popen(
    cmd,
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT
)

out_data = ""
print(time.time(), "START")
while not "QUIT" in str(out_data):
    out_data = test_proc.stdout.readline()
    print(time.time(), "MAIN received", out_data)
test_proc.communicate()  # shut it down

查看我的终端日志(从zzz.py 删除的点):

ibug@ubuntu:~/t $ python3 p.py
1546450821.9174328 START
1546450821.9793346 MAIN received b'0 sleeping \n'
1546450822.987753 MAIN received b'1 sleeping \n'
1546450823.993136 MAIN received b'2 sleeping \n'
1546450824.997726 MAIN received b'3 sleeping \n'
1546450825.9975247 MAIN received b'4 sleeping \n'
1546450827.0094354 MAIN received b'QUIT this exercise\n'

您也可以使用for 循环:

for out_data in test_proc.stdout:
    if "QUIT" in str(out_data):
        break
    print(time.time(), "MAIN received", out_data)

如果您无法修改子进程,unbuffer(来自包 expect - 使用 APT 或 YUM 安装)可能会有所帮助。这是我的工作父代码没有更改子代码。

test_proc = subprocess.Popen(
    ["unbuffer"] + cmd,
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT
)

【讨论】:

  • 不完全;我无法更改子进程,即使插入flush。我将通过等待缓冲区填充(“自然”刷新)来尝试这个,就像我在其他测试中所做的那样。另一方面,Popen 看起来也很有希望——我的尝试让我走上了同样的死胡同。
  • 您的最后一段与我看到的功能不符。我确实得到了字符串返回,但它以大约 8k 的突发速度出现,而不是每秒读取一次。我之前确实尝试过 bufsize=1 ,它似乎对父进程接收的“突发大小”没有影响。
  • @Prune 对不起,这是一个错误。我已经更新了我的答案
  • 谢谢;哇!我主要担心的是功能不同步。
  • unbuffer 会很好,但我必须使用 PYTHONUNBUFFERED 环境变量来执行此操作。应用程序特性使得轻松添加另一个模块变得困难。
猜你喜欢
  • 2010-10-27
  • 1970-01-01
  • 2016-03-26
  • 2015-12-27
  • 2012-10-06
  • 2017-02-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多