【问题标题】:How do I read the output of a child process without blocking in Rust?如何在 Rust 中不阻塞地读取子进程的输出?
【发布时间】:2016-04-09 06:44:52
【问题描述】:

我正在 Rust 中制作一个需要与子进程通信的小型 ncurses 应用程序。我已经有一个用 Common Lisp 编写的原型。我正在尝试重写它,因为 CL 为这么小的工具使用了大量的内存。

我在弄清楚如何与子流程交互时遇到了一些麻烦。

我现在做的大致是这样的:

  1. 创建流程:

    let mut program = match Command::new(command)
        .args(arguments)
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
    {
        Ok(child) => child,
        Err(_) => {
            println!("Cannot run program '{}'.", command);
            return;
        }
    };
    
  2. 将它传递给一个无限循环(直到用户退出),该循环像这样读取和处理输入并监听输出(并将其写入屏幕):

    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
        match program.stdout {
            Some(ref mut out) => {
                let mut buf_string = String::new();
                match out.read_to_string(&mut buf_string) {
                    Ok(_) => output_viewer.append_string(buf_string),
                    Err(_) => return,
                };
            }
            None => return,
        };
    }
    

read_to_string 的调用会阻塞程序直到进程退出。从我可以看到read_to_endread 似乎也被阻止了。如果我尝试运行像 ls 这样的东西,它会立即退出,它可以工作,但是对于像 pythonsbcl 这样不会退出的东西,它只有在我手动终止子进程后才会继续。

基于this answer,我将代码改为使用BufReader

    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
        match program.stdout.as_mut() {
            Some(out) => {
                let buf_reader = BufReader::new(out);
                for line in buf_reader.lines() {
                    match line {
                        Ok(l) => {
                            output_viewer.append_string(l);
                        }
                        Err(_) => return,
                    };
                }
            }
            None => return,
        }
    }

但是,问题仍然存在。它将读取所有可用的行,然后阻塞。由于该工具应该适用于任何程序,因此在尝试读取之前无法猜测输出何时结束。似乎也没有办法为BufReader 设置超时。

【问题讨论】:

    标签: process io rust blocking pty


    【解决方案1】:

    默认情况下,流是阻塞的。 TCP/IP 流、文件系统流、管道流,它们都是阻塞的。当您告诉流给您一大块字节时,它会停止并等待,直到它具有给定的字节数或发生其他事情(interrupt,流结束,错误)。

    操作系统急于将数据返回到读取过程中,所以如果您只想等待下一行并在它进入时立即处理,那么Shepmaster在Unable to pipe to or from spawned child process more than once中建议的方法(和也在他的回答中)有效。
    虽然理论上它不必工作,因为允许操作系统让BufReader 等待read 中的更多数据,但实际上操作系统更喜欢早期的“短读取”而不是等待。

    当您需要处理多个流(例如子进程的stdoutstderr)或多个进程时,这种基于BufReader 的简单方法变得更加危险。例如,基于BufReader 的方法可能会在子进程等待您排空其stderr 管道而您的进程被阻塞等待它为空stdout 时出现死锁。

    同样,当您不希望您的程序无限期地等待子进程时,您不能使用BufReader。也许您想在孩子仍在工作并且不给您任何输出时显示进度条或计时器。

    如果您的操作系统碰巧不急于将数据返回给进程(更喜欢“完整读取”而不是“短读取”),则您不能使用基于 BufReader 的方法,因为在这种情况下,最后几行子进程打印的结果可能会出现在灰色区域:操作系统得到了它们,但它们还不够大,无法填满 BufReader 的缓冲区。

    BufReader 仅限于Read 接口允许它对流执行的操作,它的阻塞程度不亚于底层流。为了提高效率,它将read分块输入,告诉操作系统尽可能多地填充其可用的缓冲区。

    您可能想知道为什么在这里分块读取数据如此重要,为什么BufReader 不能逐字节读取数据。问题是要从流中读取数据,我们需要操作系统的帮助。另一方面,我们不是操作系统,我们与它隔离工作,以免在我们的进程出现问题时弄乱它。因此,为了调用操作系统,需要转换到“内核模式”,这也可能导致“上下文切换”。这就是为什么调用操作系统来读取每个字节的成本很高的原因。我们希望尽可能少的操作系统调用,因此我们分批获取流数据。

    要在不阻塞的情况下等待流,您需要一个非阻塞流。 MIO promises to have the required non-blocking stream support for pipes,很可能是PipeReader,不过我还没查到。

    流的非阻塞性质应该可以读取块中的数据,而不管操作系统是否更喜欢“短读取”。因为非阻塞流永远不会阻塞。如果流中没有数据,它只会告诉您。

    在没有非阻塞流的情况下,您将不得不求助于生成线程,以便阻塞读取将在单独的线程中执行,因此不会阻塞您的主线程。您可能还希望逐字节读取流,以便在操作系统不喜欢“短读取”的情况下立即对行分隔符做出反应。这是一个工作示例:https://gist.github.com/ArtemGr/db40ae04b431a95f2b78

    附:下面是一个允许通过共享字节向量监视程序标准输出的函数示例:

    use std::io::Read;
    use std::process::{Command, Stdio};
    use std::sync::{Arc, Mutex};
    use std::thread;
    
    /// Pipe streams are blocking, we need separate threads to monitor them without blocking the primary thread.
    fn child_stream_to_vec<R>(mut stream: R) -> Arc<Mutex<Vec<u8>>>
    where
        R: Read + Send + 'static,
    {
        let out = Arc::new(Mutex::new(Vec::new()));
        let vec = out.clone();
        thread::Builder::new()
            .name("child_stream_to_vec".into())
            .spawn(move || loop {
                let mut buf = [0];
                match stream.read(&mut buf) {
                    Err(err) => {
                        println!("{}] Error reading from stream: {}", line!(), err);
                        break;
                    }
                    Ok(got) => {
                        if got == 0 {
                            break;
                        } else if got == 1 {
                            vec.lock().expect("!lock").push(buf[0])
                        } else {
                            println!("{}] Unexpected number of bytes: {}", line!(), got);
                            break;
                        }
                    }
                }
            })
            .expect("!thread");
        out
    }
    
    fn main() {
        let mut cat = Command::new("cat")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .spawn()
            .expect("!cat");
    
        let out = child_stream_to_vec(cat.stdout.take().expect("!stdout"));
        let err = child_stream_to_vec(cat.stderr.take().expect("!stderr"));
        let mut stdin = match cat.stdin.take() {
            Some(stdin) => stdin,
            None => panic!("!stdin"),
        };
    }
    

    通过几个助手,我使用它来控制 SSH 会话:

    try_s! (stdin.write_all (b"echo hello world\n"));
    try_s! (wait_forˢ (&out, 0.1, 9., |s| s == "hello world\n"));
    

    附:请注意,在 async-std 中的 read 调用中的 await 也是阻塞的。它只是阻塞系统线程而不是阻塞一个期货链(本质上是一个无堆栈的绿色线程)。 poll_read 是非阻塞接口。在 async-std#499 中,我询问了开发人员这些 API 是否提供短读保证。

    附:可能有a similar concern in Nom:“我们想告诉 IO 端根据解析器的结果重新填充(不完整与否)

    附:看看如何在 crossterm 中实现流读取可能会很有趣。对于 Windows,在poll.rs 中,它们使用的是本机WaitForMultipleObjects。在unix.rs 他们正在使用mio poll

    【讨论】:

    • 感谢您的帮助解释。我会研究 MIO,如果不行,我会使用单独的线程。
    【解决方案2】:

    东京的Command

    这里是一个使用tokio0.2的例子:

    use std::process::Stdio;
    use futures::StreamExt; // 0.3.1
    use tokio::{io::BufReader, prelude::*, process::Command}; // 0.2.4, features = ["full"]
    
    #[tokio::main]
    async fn main() {
        let mut cmd = Command::new("/tmp/slow.bash")
            .stdout(Stdio::piped()) // Can do the same for stderr
            .spawn()
            .expect("cannot spawn");
    
        let stdout = cmd.stdout().take().expect("no stdout");
        // Can do the same for stderr
    
        // To print out each line
        // BufReader::new(stdout)
        //     .lines()
        //     .for_each(|s| async move { println!("> {:?}", s) })
        //     .await;
    
        // To print out each line *and* collect it all into a Vec
        let result: Vec<_> = BufReader::new(stdout)
            .lines()
            .inspect(|s| println!("> {:?}", s))
            .collect()
            .await;
    
        println!("All the lines: {:?}", result);
    }
    

    Tokio 线程池

    这是使用tokio 0.1 和tokio-threadpool 的示例。我们使用blocking 函数在线程中启动进程。我们使用stream::poll_fn 将其转换为流

    use std::process::{Command, Stdio};
    use tokio::{prelude::*, runtime::Runtime}; // 0.1.18
    use tokio_threadpool; // 0.1.13
    
    fn stream_command_output(
        mut command: Command,
    ) -> impl Stream<Item = Vec<u8>, Error = tokio_threadpool::BlockingError> {
        // Ensure that the output is available to read from and start the process
        let mut child = command
            .stdout(Stdio::piped())
            .spawn()
            .expect("cannot spawn");
        let mut stdout = child.stdout.take().expect("no stdout");
    
        // Create a stream of data
        stream::poll_fn(move || {
            // Perform blocking IO
            tokio_threadpool::blocking(|| {
                // Allocate some space to store anything read
                let mut data = vec![0; 128];
                // Read 1-128 bytes of data
                let n_bytes_read = stdout.read(&mut data).expect("cannot read");
    
                if n_bytes_read == 0 {
                    // Stdout is done
                    None
                } else {
                    // Only return as many bytes as we read
                    data.truncate(n_bytes_read);
                    Some(data)
                }
            })
        })
    }
    
    fn main() {
        let output_stream = stream_command_output(Command::new("/tmp/slow.bash"));
    
        let mut runtime = Runtime::new().expect("Unable to start the runtime");
    
        let result = runtime.block_on({
            output_stream
                .map(|d| String::from_utf8(d).expect("Not UTF-8"))
                .fold(Vec::new(), |mut v, s| {
                    print!("> {}", s);
                    v.push(s);
                    Ok(v)
                })
        });
    
        println!("All the lines: {:?}", result);
    }
    

    这里有很多可能的权衡。例如,总是分配 128 字节并不理想,但实现起来很简单。

    支持

    作为参考,这里是 slow.bash

    #!/usr/bin/env bash
    
    set -eu
    
    val=0
    
    while [[ $val -lt 10 ]]; do
        echo $val
        val=$(($val + 1))
        sleep 1
    done
    

    另见:

    【讨论】:

      【解决方案3】:

      如果 Unix 支持足够,您还可以将两个输出流设为非阻塞,并像在 TcpStream 上使用 set_nonblocking 函数那样轮询它们。

      Command spawn 返回的ChildStdoutChildStderrStdio(并包含一个文件描述符),您可以直接修改这些句柄的读取行为以使其成为非阻塞。

      基于jcreekmore/timeout-readwrite-rsanowell/nonblock-rs的工作,我使用这个包装器来修改流句柄:

      extern crate libc;
      use std::io::Read;
      use std::os::unix::io::AsRawFd;
      use libc::{F_GETFL, F_SETFL, fcntl, O_NONBLOCK};
      
      fn set_nonblocking<H>(handle: &H, nonblocking: bool) -> std::io::Result<()>
      where
          H: Read + AsRawFd,
      {
          let fd = handle.as_raw_fd();
          let flags = unsafe { fcntl(fd, F_GETFL, 0) };
          if flags < 0 {
              return Err(std::io::Error::last_os_error());
          }
          let flags = if nonblocking{
              flags | O_NONBLOCK
          } else {
              flags & !O_NONBLOCK
          };
          let res = unsafe { fcntl(fd, F_SETFL, flags) };
          if res != 0 {
              return Err(std::io::Error::last_os_error());
          }
          Ok(())
      }
      

      您可以像管理其他任何非阻塞流一样管理这两个流。以下示例基于polling crate,这使得处理读取事件非常容易,BufReader 用于行读取:

      use std::process::{Command, Stdio};
      use std::path::PathBuf;
      use std::io::{BufReader, BufRead};
      use std::thread;
      extern crate polling;
      use polling::{Event, Poller};
      
      fn main() -> Result<(), std::io::Error> {
          let path = PathBuf::from("./worker.sh").canonicalize()?;
      
          let mut child = Command::new(path)
              .stdin(Stdio::null())
              .stdout(Stdio::piped())
              .stderr(Stdio::piped())
              .spawn()
              .expect("Failed to start worker");
      
          let handle = thread::spawn({
              let stdout = child.stdout.take().unwrap();
              set_nonblocking(&stdout, true)?;
              let mut reader_out = BufReader::new(stdout);
      
              let stderr = child.stderr.take().unwrap();
              set_nonblocking(&stderr, true)?;
              let mut reader_err = BufReader::new(stderr);
      
              move || {
                  let key_out = 1;
                  let key_err = 2;
                  let mut out_closed = false;
                  let mut err_closed = false;
      
                  let poller = Poller::new().unwrap();
                  poller.add(reader_out.get_ref(), Event::readable(key_out)).unwrap();
                  poller.add(reader_err.get_ref(), Event::readable(key_err)).unwrap();
      
                  let mut line = String::new();
                  let mut events = Vec::new();
                  loop {
                      // Wait for at least one I/O event.
                      events.clear();
                      poller.wait(&mut events, None).unwrap();
      
                      for ev in &events {
                          // stdout is ready for reading
                          if ev.key == key_out {
                              let len = match reader_out.read_line(&mut line) {
                                  Ok(len) => len,
                                  Err(e) => {
                                      println!("stdout read returned error: {}", e);
                                      0
                                  }
                              };
                              if len == 0 {
                                  println!("stdout closed (len is null)");
                                  out_closed = true;
                                  poller.delete(reader_out.get_ref()).unwrap();
                              } else {
                                  print!("[STDOUT] {}", line);
                                  line.clear();
                                  // reload the poller
                                  poller.modify(reader_out.get_ref(), Event::readable(key_out)).unwrap();
                              }
                          }
      
                          // stderr is ready for reading
                          if ev.key == key_err {
                              let len = match reader_err.read_line(&mut line) {
                                  Ok(len) => len,
                                  Err(e) => {
                                      println!("stderr read returned error: {}", e);
                                      0
                                  }
                              };
                              if len == 0 {
                                  println!("stderr closed (len is null)");
                                  err_closed = true;
                                  poller.delete(reader_err.get_ref()).unwrap();
                              } else {
                                  print!("[STDERR] {}", line);
                                  line.clear();
                                  // reload the poller
                                  poller.modify(reader_err.get_ref(), Event::readable(key_err)).unwrap();
                              }
                          }
                      }
      
                      if out_closed && err_closed {
                          println!("Stream closed, exiting process thread");
                          break;
                      }
                  }
              }
          });
      
          handle.join().unwrap();
          Ok(())
      }
      

      此外,与 EventFd 上的包装器一起使用,可以轻松地从另一个线程停止进程,而无需阻塞或主动轮询,并且仅使用单个线程。

      编辑:似乎在我的测试之后,轮询板条箱自动将轮询句柄设置为非阻塞模式。如果您想直接使用 nix::poll 对象,set_nonblocking 函数仍然很有用。

      【讨论】:

        【解决方案4】:

        我遇到了足够多的用例,在这些用例中,通过行分隔的文本与子进程交互很有用,我为此编写了一个 crate,interactive_process

        我希望原来的问题早就解决了,但我认为它可能对其他人有帮助。

        【讨论】:

          猜你喜欢
          • 2018-06-23
          • 2017-12-30
          • 2014-03-29
          • 2016-07-28
          • 1970-01-01
          • 2018-08-10
          • 1970-01-01
          • 1970-01-01
          • 2011-09-18
          相关资源
          最近更新 更多