【问题标题】:How to implement blocking iterator over stdin?如何在标准输入上实现阻塞迭代器?
【发布时间】:2021-12-07 00:39:58
【问题描述】:

我需要实现一个通过标准输入接收消息的长时间运行的程序。该协议定义消息采用长度指示符的形式(为简单起见,1 字节整数),然后是长度指示符表示的长度字符串。消息不被任何空格分隔。 该程序预计会使用来自标准输入的所有消息并等待其他消息。

如何在标准输入上实现这种等待?

我以尝试从标准输入读取并在出错时重复的方式实现迭代器。它有效,但效率非常低。 我希望迭代器在新数据到来时读取消息。

我的实现是使用read_exact:

use std::io::{Read, stdin, Error as IOError, ErrorKind};

pub struct In<R>(R) where R: Read;

pub trait InStream{
    fn read_one(&mut self) -> Result<String, IOError>;
}

impl <R>In<R> where R: Read{
    pub fn new(stdin: R) -> In<R> {
        In(stdin)
    }
}

impl <R>InStream for In<R> where R: Read{
    /// Read one message from stdin and return it as string
    fn read_one(&mut self) -> Result<String, IOError>{

        const length_indicator: usize = 1;
        let stdin = &mut self.0;

        let mut size: [u8;length_indicator] = [0; length_indicator];
        stdin.read_exact(&mut size)?;
        let size = u8::from_be_bytes(size) as usize;

        let mut buffer = vec![0u8; size];
        let _bytes_read = stdin.read_exact(&mut buffer);
        String::from_utf8(buffer).map_err(|_| IOError::new(ErrorKind::InvalidData, "not utf8"))
    }
}
impl <R>Iterator for In<R> where R:Read{
    type Item = String;
    fn next(&mut self) -> Option<String>{
        self.read_one()
            .ok()
    }
}

fn main(){
    let mut in_stream = In::new(stdin());
    loop{
        match in_stream.next(){
            Some(x) => println!("x: {:?}", x),
            None => (),
        }
    }
}

我浏览了 Read 和 BufReader 文档,但似乎没有任何方法可以解决我的问题,因为 read doc 包含以下文本:

这个函数不提供任何关于它是否阻塞等待数据的保证,但是如果一个对象需要阻塞读取并且不能,它通常会通过一个 Err 返回值来发出信号。

如何在标准输入上实现等待数据?

===

编辑:不阻塞和循环给出 UnexpectedEof 错误而不是等待数据的最小用例:

use std::io::{Read, stdin};
fn main(){
    let mut stdin = stdin();
    let mut stdin_handle = stdin.lock();
    loop{
        let mut buffer = vec![0u8; 4];
        let res = stdin_handle.read_exact(&mut buffer);
        println!("res: {:?}", res);
        println!("buffer: {:?}", buffer);
    }

我在 OSX 上通过cargo run &lt; in 运行它,其中in 被命名为管道。我通过echo -n "1234" &gt; in 填充管道。

它等待第一个输入,然后循环。

res: Ok(())
buffer: [49, 50, 51, 52]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
buffer: [0, 0, 0, 0]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
buffer: [0, 0, 0, 0]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
buffer: [0, 0, 0, 0]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
buffer: [0, 0, 0, 0]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
...

我希望程序等到有足够的数据来填充缓冲区。

【问题讨论】:

  • I would like the iterator to read the message when new data comes 是什么意思?您希望它提前读取数据并在被要求时准备好数据?
  • 你给出的引用:“这个函数不提供任何保证[...]”来自特征Read,这是非常笼统的。 不确定,但此警告可能不适用于Stdin 的特定情况,因为在这种情况下,“等待数据”是可能的。
  • @netwave 我想要调用 in_stream.next() 等待块,直到它能够返回数据。我说清楚了吗?
  • StdinLock 实现了BufRead,因此循环读取很简单。那里的实现确实阻塞,所以你不必做任何事情
  • 如果没有足够的数据可用,使用read_exact() 从标准输入读取会阻塞,除非在缓冲区可以填充之前将 EOF 发送到标准输入,在这种情况下它会返回错误。无论您是从无缓冲的还是缓冲的标准输入中读取都无关紧要——无论哪种情况,它都应该阻塞。你看到什么证据表明它没有阻止?你能描述一下我们如何重现这个问题吗?而且,以防万一,您在哪个平台上?

标签: rust io iterator stdin


【解决方案1】:

正如其他人所解释的,Read 上的文档编写得非常笼统,不适用于标准输入, 会阻塞。换句话说,添加了缓冲的代码就可以了。

问题在于你如何使用管道。例如,如果您在一个 shell 中运行 mkfifo foo; cat &lt;foo,在另一个 shell 中运行 echo -n bla &gt;foo,您将看到第一个 shell 中的 cat 将显示 foo 并退出。关闭管道的最后一个写入器会将 EOF 发送给读取器,从而使您的程序的 stdin 无用。

您可以通过在后台启动另一个程序来解决此问题,该程序以写入模式打开管道并且永不退出,例如tail -f /dev/null &gt;pipe-filename。然后echo -n bla &gt;foo 将被您的程序观察到,但不会导致其标准输入关闭。管道写入端的“保持”可能也可以通过 Rust 实现。

【讨论】:

    猜你喜欢
    • 2013-01-25
    • 1970-01-01
    • 2011-12-27
    • 1970-01-01
    • 2018-03-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多